http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java index bf4ed1f..e4d2e2e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java @@ -19,11 +19,6 @@ package org.apache.flume.sink; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; - import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -39,6 +34,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; + public class TestRollingFileSink { private static final Logger logger = LoggerFactory @@ -108,8 +108,8 @@ public class TestRollingFileSink { sink.stop(); for (String file : sink.getDirectory().list()) { - BufferedReader reader = new BufferedReader(new FileReader(new File( - sink.getDirectory(), file))); + BufferedReader reader = + new BufferedReader(new FileReader(new File(sink.getDirectory(), file))); String lastLine = null; String currentLine = null; @@ -157,8 +157,8 @@ public class TestRollingFileSink { sink.stop(); for (String file : sink.getDirectory().list()) { - BufferedReader reader = new BufferedReader(new FileReader(new File( - sink.getDirectory(), file))); + BufferedReader reader = + new BufferedReader(new FileReader(new File(sink.getDirectory(), file))); String lastLine = null; String currentLine = null; @@ -174,7 +174,8 @@ public class TestRollingFileSink { } @Test - public void testAppend3() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testAppend3() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { File tmpDir = new File("target/tmpLog"); tmpDir.mkdirs(); cleanDirectory(tmpDir); @@ -208,7 +209,8 @@ public class TestRollingFileSink { sink.stop(); for (String file : sink.getDirectory().list()) { - BufferedReader reader = new BufferedReader(new FileReader(new File(sink.getDirectory(), file))); + BufferedReader reader = + new BufferedReader(new FileReader(new File(sink.getDirectory(), file))); String lastLine = null; String currentLine = null; @@ -223,7 +225,8 @@ public class TestRollingFileSink { } @Test - public void testRollTime() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + public void testRollTime() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { File tmpDir = new File("target/tempLog"); tmpDir.mkdirs(); cleanDirectory(tmpDir); @@ -258,7 +261,8 @@ public class TestRollingFileSink { sink.stop(); for (String file : sink.getDirectory().list()) { - BufferedReader reader = new BufferedReader(new FileReader(new File(sink.getDirectory(), file))); + BufferedReader reader = + new BufferedReader(new FileReader(new File(sink.getDirectory(), file))); String lastLine = null; String currentLine = null;
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java index 1beec76..22dcf98 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java @@ -161,8 +161,7 @@ public class TestThriftSink { @Test public void testFailedConnect() throws Exception { - Event event = EventBuilder.withBody("test event 1", - Charset.forName("UTF8")); + Event event = EventBuilder.withBody("test event 1", Charset.forName("UTF8")); sink.start(); @@ -185,7 +184,7 @@ public class TestThriftSink { threwException = true; } Assert.assertTrue("Must throw EventDeliveryException if disconnected", - threwException); + threwException); } src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), @@ -283,7 +282,8 @@ public class TestThriftSink { Assert.assertTrue(LifecycleController.waitForOneOf(sink, LifecycleState.STOP_OR_ERROR, 5000)); if (failed) { - Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, that's wrong."); + Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, " + + "that's wrong."); } } @@ -329,7 +329,8 @@ public class TestThriftSink { Assert.assertTrue(LifecycleController.waitForOneOf(sink, LifecycleState.STOP_OR_ERROR, 5000)); if (failed) { - Assert.fail("SSL-enabled sink successfully connected to a server with an untrusted certificate when it should have failed"); + Assert.fail("SSL-enabled sink successfully connected to a server with an " + + "untrusted certificate when it should have failed"); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java index d385abe..0902db9 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java @@ -90,10 +90,12 @@ public class TestAbstractPollableSource { Context context = new Context(inputConfigs); source.configure(context); - Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + source.getBackOffSleepIncrement(), - 42l, source.getBackOffSleepIncrement()); - Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + source.getMaxBackOffSleepInterval(), - 4242l, source.getMaxBackOffSleepInterval()); + Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + + source.getBackOffSleepIncrement(), + 42L, source.getBackOffSleepIncrement()); + Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + + source.getMaxBackOffSleepInterval(), + 4242L, source.getMaxBackOffSleepInterval()); } @Test @@ -119,14 +121,16 @@ public class TestAbstractPollableSource { HashMap<String, String> inputConfigs = new HashMap<String,String>(); Assert.assertEquals("BackOffSleepIncrement should equal " + - PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT + - " but it equals " + source.getBackOffSleepIncrement(), - PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT, source.getBackOffSleepIncrement()); + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT + + " but it equals " + source.getBackOffSleepIncrement(), + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT, + source.getBackOffSleepIncrement()); Assert.assertEquals("BackOffSleepIncrement should equal " + - PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP + - " but it equals " + source.getMaxBackOffSleepInterval(), - PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP, source.getMaxBackOffSleepInterval()); + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP + + " but it equals " + source.getMaxBackOffSleepInterval(), + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP, + source.getMaxBackOffSleepInterval()); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index c75d098..d73e5ad 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -135,42 +135,47 @@ public class TestAvroSource { } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel0() throws InterruptedException, IOException { + public void testRequestWithCompressionOnClientAndServerOnLevel0() + throws InterruptedException, IOException { doRequest(true, true, 0); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel1() throws InterruptedException, IOException { + public void testRequestWithCompressionOnClientAndServerOnLevel1() + throws InterruptedException, IOException { doRequest(true, true, 1); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel6() throws InterruptedException, IOException { + public void testRequestWithCompressionOnClientAndServerOnLevel6() + throws InterruptedException, IOException { doRequest(true, true, 6); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel9() throws InterruptedException, IOException { + public void testRequestWithCompressionOnClientAndServerOnLevel9() + throws InterruptedException, IOException { doRequest(true, true, 9); } - @Test(expected=org.apache.avro.AvroRemoteException.class) + @Test(expected = org.apache.avro.AvroRemoteException.class) public void testRequestWithCompressionOnServerOnly() throws InterruptedException, IOException { //This will fail because both client and server need compression on doRequest(true, false, 6); } - @Test(expected=org.apache.avro.AvroRemoteException.class) + @Test(expected = org.apache.avro.AvroRemoteException.class) public void testRequestWithCompressionOnClientOnly() throws InterruptedException, IOException { //This will fail because both client and server need compression on doRequest(false, true, 6); } - private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, int compressionLevel) throws InterruptedException, IOException { + private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, + int compressionLevel) throws InterruptedException, IOException { boolean bound = false; for (int i = 0; i < 100 && !bound; i++) { @@ -428,8 +433,7 @@ public class TestAvroSource { false, false); try { doIpFilterTest(localhost, null, false, false); - Assert.fail( - "The null ipFilterRules config should have thrown an exception."); + Assert.fail("The null ipFilterRules config should have thrown an exception."); } catch (FlumeException e) { //Do nothing } @@ -502,15 +506,15 @@ public class TestAvroSource { try { if (testWithSSL) { nettyTransceiver = new NettyTransceiver( - new InetSocketAddress (dest, selectedPort), - new SSLChannelFactory()); + new InetSocketAddress(dest, selectedPort), + new SSLChannelFactory()); client = SpecificRequestor.getClient( - AvroSourceProtocol.class, nettyTransceiver); + AvroSourceProtocol.class, nettyTransceiver); } else { nettyTransceiver = new NettyTransceiver( - new InetSocketAddress (dest, selectedPort)); + new InetSocketAddress(dest, selectedPort)); client = SpecificRequestor.getClient( - AvroSourceProtocol.class, nettyTransceiver); + AvroSourceProtocol.class, nettyTransceiver); } AvroFlumeEvent avroEvent = new AvroFlumeEvent(); @@ -523,7 +527,7 @@ public class TestAvroSource { Assert.assertEquals(Status.OK, status); } catch (IOException e) { Assert.assertTrue("Should have been allowed: " + ruleDefinition, - !eventShouldBeAllowed); + !eventShouldBeAllowed); return; } finally { if (nettyTransceiver != null) { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java index afa93bf..6204d13 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java @@ -19,20 +19,8 @@ package org.apache.flume.source; - -import static org.junit.Assert.*; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.nio.charset.Charset; -import java.util.*; -import java.util.regex.Pattern; - -import javax.management.Attribute; -import javax.management.AttributeList; -import javax.management.MBeanServer; -import javax.management.ObjectName; - +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.SystemUtils; @@ -47,22 +35,37 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.lifecycle.LifecycleException; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import com.google.common.io.Files; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.nio.charset.Charset; +import java.util.List; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class TestExecSource { private AbstractSource source; private Channel channel = new MemoryChannel(); - private Context context = new Context(); - private ChannelSelector rcs = new ReplicatingChannelSelector(); - @Before public void setUp() { context.put("keep-alive", "1"); @@ -82,19 +85,19 @@ public class TestExecSource { // Remove the MBean registered for Monitoring ObjectName objName = null; try { - objName = new ObjectName("org.apache.flume.source" - + ":type=" + source.getName()); + objName = new ObjectName("org.apache.flume.source" + + ":type=" + source.getName()); - ManagementFactory.getPlatformMBeanServer().unregisterMBean(objName); + ManagementFactory.getPlatformMBeanServer().unregisterMBean(objName); } catch (Exception ex) { System.out.println("Failed to unregister the monitored counter: " - + objName + ex.getMessage()); + + objName + ex.getMessage()); } } @Test public void testProcess() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + EventDeliveryException, IOException { // Generates a random files for input\output File inputFile = File.createTempFile("input", null); @@ -104,16 +107,15 @@ public class TestExecSource { // Generates input file with a random data set (10 lines, 200 characters each) FileOutputStream outputStream1 = new FileOutputStream(inputFile); - for (int i=0; i<10; i++) { - outputStream1.write( - RandomStringUtils.randomAlphanumeric(200).getBytes()); - outputStream1.write('\n'); + for (int i = 0; i < 10; i++) { + outputStream1.write(RandomStringUtils.randomAlphanumeric(200).getBytes()); + outputStream1.write('\n'); } outputStream1.close(); String command = SystemUtils.IS_OS_WINDOWS ? - String.format("cmd /c type %s", inputFile.getAbsolutePath()) : - String.format("cat %s", inputFile.getAbsolutePath()); + String.format("cmd /c type %s", inputFile.getAbsolutePath()) : + String.format("cat %s", inputFile.getAbsolutePath()); context.put("command", command); context.put("keep-alive", "1"); context.put("capacity", "1000"); @@ -139,155 +141,150 @@ public class TestExecSource { transaction.close(); Assert.assertEquals(FileUtils.checksumCRC32(inputFile), - FileUtils.checksumCRC32(ouputFile)); + FileUtils.checksumCRC32(ouputFile)); } @Test public void testShellCommandSimple() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + EventDeliveryException, IOException { if (SystemUtils.IS_OS_WINDOWS) { runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", - "1..5", new String[]{"1", "2", "3", "4", "5"}); + "1..5", new String[] { "1", "2", "3", "4", "5" }); } else { runTestShellCmdHelper("/bin/bash -c", "seq 5", - new String[]{"1", "2", "3", "4", "5"}); + new String[] { "1", "2", "3", "4", "5" }); } } @Test - public void testShellCommandBackTicks() - throws InterruptedException, LifecycleException, EventDeliveryException, - IOException { + public void testShellCommandBackTicks() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { // command with backticks if (SystemUtils.IS_OS_WINDOWS) { - runTestShellCmdHelper( - "powershell -ExecutionPolicy Unrestricted -command", "$(1..5)", - new String[]{"1", "2", "3", "4", "5"}); + runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", "$(1..5)", + new String[] { "1", "2", "3", "4", "5" }); } else { runTestShellCmdHelper("/bin/bash -c", "echo `seq 5`", - new String[]{"1 2 3 4 5"}); + new String[] { "1 2 3 4 5" }); runTestShellCmdHelper("/bin/bash -c", "echo $(seq 5)", - new String[]{"1 2 3 4 5"}); + new String[] { "1 2 3 4 5" }); } } @Test public void testShellCommandComplex() - throws InterruptedException, LifecycleException, EventDeliveryException, - IOException { + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { // command with wildcards & pipes String[] expected = {"1234", "abcd", "ijk", "xyz", "zzz"}; // pipes if (SystemUtils.IS_OS_WINDOWS) { - runTestShellCmdHelper( - "powershell -ExecutionPolicy Unrestricted -command", - "'zzz','1234','xyz','abcd','ijk' | sort", expected); + runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", + "'zzz','1234','xyz','abcd','ijk' | sort", expected); } else { runTestShellCmdHelper("/bin/bash -c", - "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f", expected); + "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f", expected); } } @Test - public void testShellCommandScript() - throws InterruptedException, LifecycleException, EventDeliveryException, - IOException { + public void testShellCommandScript() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { // mini script if (SystemUtils.IS_OS_WINDOWS) { runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", - "foreach ($i in 1..5) { $i }", new String[]{"1", "2", "3", "4", "5"}); + "foreach ($i in 1..5) { $i }", + new String[] { "1", "2", "3", "4", "5" }); // shell arithmetic runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", - "if(2+2 -gt 3) { 'good' } else { 'not good' } ", new String[]{"good"}); + "if(2+2 -gt 3) { 'good' } else { 'not good' } ", + new String[] { "good" }); } else { - runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" - , new String[]{"1", "2", "3", "4", "5"}); + runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done", + new String[] { "1", "2", "3", "4", "5" }); // shell arithmetic - runTestShellCmdHelper("/bin/bash -c", "if ((2+2>3)); " + - "then echo good; else echo not good; fi", new String[]{"good"}); + runTestShellCmdHelper("/bin/bash -c", + "if ((2+2>3)); " + "then echo good; else echo not good; fi", + new String[] { "good" }); } } @Test public void testShellCommandEmbeddingAndEscaping() - throws InterruptedException, LifecycleException, EventDeliveryException, - IOException { + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { // mini script String fileName = SystemUtils.IS_OS_WINDOWS ? - "src\\test\\resources\\test_command.ps1" : - "src/test/resources/test_command.txt"; + "src\\test\\resources\\test_command.ps1" : + "src/test/resources/test_command.txt"; BufferedReader reader = new BufferedReader(new FileReader(fileName)); - try { - String shell = SystemUtils.IS_OS_WINDOWS ? - "powershell -ExecutionPolicy Unrestricted -command" : - "/bin/bash -c"; - String command1 = reader.readLine(); - Assert.assertNotNull(command1); - String[] output1 = new String[] {"'1'", "\"2\"", "\\3", "\\4"}; - runTestShellCmdHelper( shell, command1 , output1); - String command2 = reader.readLine(); - Assert.assertNotNull(command2); - String[] output2 = new String[]{"1","2","3","4","5" }; - runTestShellCmdHelper(shell, command2 , output2); - String command3 = reader.readLine(); - Assert.assertNotNull(command3); - String[] output3 = new String[]{"2","3","4","5","6" }; - runTestShellCmdHelper(shell, command3 , output3); - } finally { - reader.close(); - } + try { + String shell = SystemUtils.IS_OS_WINDOWS ? + "powershell -ExecutionPolicy Unrestricted -command" : + "/bin/bash -c"; + String command1 = reader.readLine(); + Assert.assertNotNull(command1); + String[] output1 = new String[] { "'1'", "\"2\"", "\\3", "\\4" }; + runTestShellCmdHelper(shell, command1, output1); + String command2 = reader.readLine(); + Assert.assertNotNull(command2); + String[] output2 = new String[] { "1", "2", "3", "4", "5" }; + runTestShellCmdHelper(shell, command2, output2); + String command3 = reader.readLine(); + Assert.assertNotNull(command3); + String[] output3 = new String[] { "2", "3", "4", "5", "6" }; + runTestShellCmdHelper(shell, command3, output3); + } finally { + reader.close(); } + } @Test - public void testMonitoredCounterGroup() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + public void testMonitoredCounterGroup() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { // mini script if (SystemUtils.IS_OS_WINDOWS) { runTestShellCmdHelper("powershell -ExecutionPolicy Unrestricted -command", - "foreach ($i in 1..5) { $i }" - , new String[]{"1", "2", "3", "4", "5"}); + "foreach ($i in 1..5) { $i }", + new String[] { "1", "2", "3", "4", "5" }); } else { - runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done" - , new String[]{"1", "2", "3", "4", "5"}); + runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done", + new String[] { "1", "2", "3", "4", "5" }); } ObjectName objName = null; try { - objName = new ObjectName("org.apache.flume.source" - + ":type=" + source.getName()); - - MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - String strAtts[] = {"Type", "EventReceivedCount", "EventAcceptedCount"}; - AttributeList attrList = mbeanServer.getAttributes(objName, strAtts); - - Assert.assertNotNull(attrList.get(0)); - Assert.assertEquals("Expected Value: Type", "Type", - ((Attribute) attrList.get(0)).getName()); - Assert.assertEquals("Expected Value: SOURCE", "SOURCE", - ((Attribute) attrList.get(0)).getValue()); - - Assert.assertNotNull(attrList.get(1)); - Assert.assertEquals("Expected Value: EventReceivedCount", "EventReceivedCount", - ((Attribute) attrList.get(1)).getName()); - Assert.assertEquals("Expected Value: 5", "5", - ((Attribute) attrList.get(1)).getValue().toString()); - - Assert.assertNotNull(attrList.get(2)); - Assert.assertEquals("Expected Value: EventAcceptedCount", "EventAcceptedCount", - ((Attribute) attrList.get(2)).getName()); - Assert.assertEquals("Expected Value: 5", "5", - ((Attribute) attrList.get(2)).getValue().toString()); + objName = new ObjectName("org.apache.flume.source" + ":type=" + source.getName()); + + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + String[] strAtts = { "Type", "EventReceivedCount", "EventAcceptedCount" }; + AttributeList attrList = mbeanServer.getAttributes(objName, strAtts); + + Assert.assertNotNull(attrList.get(0)); + Assert.assertEquals("Expected Value: Type", "Type", + ((Attribute) attrList.get(0)).getName()); + Assert.assertEquals("Expected Value: SOURCE", "SOURCE", + ((Attribute) attrList.get(0)).getValue()); + + Assert.assertNotNull(attrList.get(1)); + Assert.assertEquals("Expected Value: EventReceivedCount", "EventReceivedCount", + ((Attribute) attrList.get(1)).getName()); + Assert.assertEquals("Expected Value: 5", "5", + ((Attribute) attrList.get(1)).getValue().toString()); + + Assert.assertNotNull(attrList.get(2)); + Assert.assertEquals("Expected Value: EventAcceptedCount", "EventAcceptedCount", + ((Attribute) attrList.get(2)).getName()); + Assert.assertEquals("Expected Value: 5", "5", + ((Attribute) attrList.get(2)).getValue().toString()); } catch (Exception ex) { - System.out.println("Unable to retreive the monitored counter: " - + objName + ex.getMessage()); + System.out.println("Unable to retreive the monitored counter: " + objName + ex.getMessage()); } } @Test - public void testBatchTimeout() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + public void testBatchTimeout() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { String filePath = "/tmp/flume-execsource." + Thread.currentThread().getId(); String eventBody = "TestMessage"; @@ -296,12 +293,12 @@ public class TestExecSource { context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, "50000"); context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "750"); context.put("shell", SystemUtils.IS_OS_WINDOWS ? - "powershell -ExecutionPolicy Unrestricted -command" : - "/bin/bash -c"); + "powershell -ExecutionPolicy Unrestricted -command" : + "/bin/bash -c"); context.put("command", SystemUtils.IS_OS_WINDOWS ? - "Get-Content " + filePath + - " | Select-Object -Last 10" : - ("tail -f " + filePath)); + "Get-Content " + filePath + + " | Select-Object -Last 10" : + ("tail -f " + filePath)); Configurables.configure(source, context); source.start(); @@ -310,15 +307,15 @@ public class TestExecSource { transaction.begin(); for (int lineNumber = 0; lineNumber < 3; lineNumber++) { - outputStream.write((eventBody).getBytes()); - outputStream.write(String.valueOf(lineNumber).getBytes()); - outputStream.write('\n'); - outputStream.flush(); + outputStream.write((eventBody).getBytes()); + outputStream.write(String.valueOf(lineNumber).getBytes()); + outputStream.write('\n'); + outputStream.flush(); } outputStream.close(); Thread.sleep(1500); - for(int i = 0; i < 3; i++) { + for (int i = 0; i < 3; i++) { Event event = channel.take(); assertNotNull(event); assertNotNull(event.getBody()); @@ -332,45 +329,37 @@ public class TestExecSource { FileUtils.forceDelete(file); } - private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput) - throws InterruptedException, LifecycleException, EventDeliveryException, IOException { - context.put("shell", shell); - context.put("command", command); - Configurables.configure(source, context); - source.start(); - // Some commands might take longer to complete, specially on Windows - // or on slow environments (e.g. Travis CI). - Thread.sleep(2500); - Transaction transaction = channel.getTransaction(); - transaction.begin(); - try { - List<String> output = Lists.newArrayList(); - Event event; - while ((event = channel.take()) != null) { - output.add(new String(event.getBody(), Charset.defaultCharset())); - } - transaction.commit(); -// System.out.println("command : " + command); -// System.out.println("output : "); -// for( String line : output ) -// System.out.println(line); - Assert.assertArrayEquals(expectedOutput, output.toArray(new String[]{})); - } finally { - transaction.close(); - source.stop(); + private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput) + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + context.put("shell", shell); + context.put("command", command); + Configurables.configure(source, context); + source.start(); + // Some commands might take longer to complete, specially on Windows + // or on slow environments (e.g. Travis CI). + Thread.sleep(2500); + Transaction transaction = channel.getTransaction(); + transaction.begin(); + try { + List<String> output = Lists.newArrayList(); + Event event; + while ((event = channel.take()) != null) { + output.add(new String(event.getBody(), Charset.defaultCharset())); } + transaction.commit(); + Assert.assertArrayEquals(expectedOutput, output.toArray(new String[] {})); + } finally { + transaction.close(); + source.stop(); } - + } @Test public void testRestart() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { - + EventDeliveryException, IOException { context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10"); context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true"); - - context.put("command", - SystemUtils.IS_OS_WINDOWS ? "cmd /c echo flume" : "echo flume"); + context.put("command", SystemUtils.IS_OS_WINDOWS ? "cmd /c echo flume" : "echo flume"); Configurables.configure(source, context); source.start(); @@ -380,7 +369,7 @@ public class TestExecSource { long start = System.currentTimeMillis(); - for(int i = 0; i < 5; i++) { + for (int i = 0; i < 5; i++) { Event event = channel.take(); assertNotNull(event); assertNotNull(event.getBody()); @@ -396,7 +385,6 @@ public class TestExecSource { source.stop(); } - /** * Tests to make sure that the shutdown mechanism works. There are races * in this test if the system has another sleep command running with the @@ -412,14 +400,12 @@ public class TestExecSource { boolean searchForCommand = true; while (searchForCommand) { searchForCommand = false; - String command = SystemUtils.IS_OS_WINDOWS ? ("cmd /c sleep " + seconds) : - ("sleep " + seconds); - String searchTxt = SystemUtils.IS_OS_WINDOWS ? ("sleep.exe") : - ("\b" + command + "\b"); + String command = SystemUtils.IS_OS_WINDOWS ? "cmd /c sleep " + seconds : "sleep " + seconds; + String searchTxt = SystemUtils.IS_OS_WINDOWS ? "sleep.exe" : "\b" + command + "\b"; Pattern pattern = Pattern.compile(searchTxt); for (String line : exec(SystemUtils.IS_OS_WINDOWS ? - "cmd /c tasklist /FI \"SESSIONNAME eq Console\"" : - "ps -ef")) { + "cmd /c tasklist /FI \"SESSIONNAME eq Console\"" : + "ps -ef")) { if (pattern.matcher(line).find()) { seconds++; searchForCommand = true; @@ -444,9 +430,9 @@ public class TestExecSource { source.stop(); Thread.sleep(1000L); for (String line : exec(SystemUtils.IS_OS_WINDOWS ? - "cmd /c tasklist /FI \"SESSIONNAME eq Console\"" : - "ps -ef")) { - if(pattern.matcher(line).find()) { + "cmd /c tasklist /FI \"SESSIONNAME eq Console\"" : + "ps -ef")) { + if (pattern.matcher(line).find()) { Assert.fail("Found [" + line + "]"); } } @@ -457,23 +443,21 @@ public class TestExecSource { Process process = new ProcessBuilder(commandArgs).start(); BufferedReader reader = null; try { - reader = new BufferedReader( - new InputStreamReader(process.getInputStream())); + reader = new BufferedReader(new InputStreamReader(process.getInputStream())); List<String> result = Lists.newArrayList(); String line; - while((line = reader.readLine()) != null) { + while ((line = reader.readLine()) != null) { result.add(line); } return result; } finally { process.destroy(); - if(reader != null) { + if (reader != null) { reader.close(); } int exit = process.waitFor(); - if(exit != 0) { - throw new IllegalStateException("Command [" + command + "] exited with " - + exit); + if (exit != 0) { + throw new IllegalStateException("Command [" + command + "] exited with " + exit); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java index c3dc241..56c7881 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java @@ -58,6 +58,7 @@ import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.mockito.Mockito.*; public class TestMultiportSyslogTCPSource { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java index e11b4b6..99d413a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -21,7 +21,11 @@ package org.apache.flume.source; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; -import org.apache.flume.*; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; @@ -45,26 +49,27 @@ import java.util.ArrayList; import java.util.List; public class TestNetcatSource { - private static final Logger logger = LoggerFactory - .getLogger(TestAvroSource.class); + private static final Logger logger = + LoggerFactory.getLogger(TestAvroSource.class); /** * Five first sentences of the Fables "The Crow and the Fox" * written by Jean de La Fontaine, French poet. * - * @see <a href="http://en.wikipedia.org/wiki/Jean_de_La_Fontaine">Jean de La Fontaine on wikipedia</a> + * @see <a href="http://en.wikipedia.org/wiki/Jean_de_La_Fontaine">Jean de La Fontaine on + * wikipedia</a> */ private final String french = "Maître Corbeau, sur un arbre perché, " + - "Tenait en son bec un fromage. " + - "Maître Renard, par l'odeur alléché, " + - "Lui tint à peu près ce langage : " + - "Et bonjour, Monsieur du Corbeau,"; + "Tenait en son bec un fromage. " + + "Maître Renard, par l'odeur alléché, " + + "Lui tint à peu près ce langage : " + + "Et bonjour, Monsieur du Corbeau,"; private final String english = "At the top of a tree perched Master Crow; " + - "In his beak he was holding a cheese. " + - "Drawn by the smell, Master Fox spoke, below. " + - "The words, more or less, were these: " + - "\"Hey, now, Sir Crow! Good day, good day!"; + "In his beak he was holding a cheese. " + + "Drawn by the smell, Master Fox spoke, below. " + + "The words, more or less, were these: " + + "\"Hey, now, Sir Crow! Good day, good day!"; private int selectedPort; private NetcatSource source; @@ -109,12 +114,14 @@ public class TestNetcatSource { // Test on english text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, english, encoding); - Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), + getFlumeEvent()); } // Test on french text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, french, encoding); - Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), + getFlumeEvent()); } } finally { netcatSocket.close(); @@ -137,12 +144,14 @@ public class TestNetcatSource { // Test on english text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, english, encoding); - Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), + getFlumeEvent()); } // Test on french text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, french, encoding); - Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), + getFlumeEvent()); } } finally { netcatSocket.close(); @@ -165,12 +174,14 @@ public class TestNetcatSource { // Test on english text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, english, encoding); - Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), + getFlumeEvent()); } // Test on french text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, french, encoding); - Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), + getFlumeEvent()); } } finally { netcatSocket.close(); @@ -193,12 +204,14 @@ public class TestNetcatSource { // Test on english text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, english, encoding); - Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), + getFlumeEvent()); } // Test on french text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, french, encoding); - Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), + getFlumeEvent()); } } finally { netcatSocket.close(); @@ -223,13 +236,15 @@ public class TestNetcatSource { // Test on english text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, english, encoding); - Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), + getFlumeEvent()); Assert.assertEquals("Socket contained the Ack", ackEvent, inputLineIterator.nextLine()); } // Test on french text snippet for (int i = 0; i < 20; i++) { sendEvent(netcatSocket, french, encoding); - Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), + getFlumeEvent()); Assert.assertEquals("Socket contained the Ack", ackEvent, inputLineIterator.nextLine()); } } finally { @@ -251,7 +266,8 @@ public class TestNetcatSource { Socket netcatSocket = new Socket(localhost, selectedPort); try { sendEvent(netcatSocket, "123456789", encoding); - Assert.assertArrayEquals("Channel contained our event", "123456789".getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", + "123456789".getBytes(defaultCharset), getFlumeEvent()); sendEvent(netcatSocket, english, encoding); Assert.assertEquals("Channel does not contain an event", null, getRawFlumeEvent()); } finally { @@ -276,18 +292,21 @@ public class TestNetcatSource { LineIterator inputLineIterator = IOUtils.lineIterator(netcatSocket.getInputStream(), encoding); try { sendEvent(netcatSocket, "123456789", encoding); - Assert.assertArrayEquals("Channel contained our event", "123456789".getBytes(defaultCharset), getFlumeEvent()); + Assert.assertArrayEquals("Channel contained our event", + "123456789".getBytes(defaultCharset), getFlumeEvent()); Assert.assertEquals("Socket contained the Ack", ackEvent, inputLineIterator.nextLine()); sendEvent(netcatSocket, english, encoding); Assert.assertEquals("Channel does not contain an event", null, getRawFlumeEvent()); - Assert.assertEquals("Socket contained the Error Ack", ackErrorEvent, inputLineIterator.nextLine()); + Assert.assertEquals("Socket contained the Error Ack", ackErrorEvent, inputLineIterator + .nextLine()); } finally { netcatSocket.close(); stopSource(); } } - private void startSource(String encoding, String ack, String batchSize, String maxLineLength) throws InterruptedException { + private void startSource(String encoding, String ack, String batchSize, String maxLineLength) + throws InterruptedException { boolean bound = false; for (int i = 0; i < 100 && !bound; i++) { @@ -313,9 +332,9 @@ public class TestNetcatSource { } Assert.assertTrue("Reached start or error", - LifecycleController.waitForOneOf(source, LifecycleState.START_OR_ERROR)); + LifecycleController.waitForOneOf(source, LifecycleState.START_OR_ERROR)); Assert.assertEquals("Server is started", LifecycleState.START, - source.getLifecycleState()); + source.getLifecycleState()); } private void sendEvent(Socket socket, String content, String encoding) throws IOException { @@ -366,9 +385,9 @@ public class TestNetcatSource { private void stopSource() throws InterruptedException { source.stop(); Assert.assertTrue("Reached stop or error", - LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); + LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); Assert.assertEquals("Server is stopped", LifecycleState.STOP, - source.getLifecycleState()); + source.getLifecycleState()); logger.info("Source stopped"); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java index 2bbcdaf..5d6cc29 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java @@ -18,9 +18,6 @@ */ package org.apache.flume.source; -import java.util.ArrayList; -import java.util.List; - import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -36,6 +33,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + public class TestSequenceGeneratorSource { private PollableSource source; @@ -105,7 +105,7 @@ public class TestSequenceGeneratorSource { for (long j = batchSize; j > 0; j--) { Event event = channel.take(); - String expectedVal = String.valueOf(((i+1)*batchSize)-j); + String expectedVal = String.valueOf(((i + 1) * batchSize) - j); String resultedVal = new String(event.getBody()); Assert.assertTrue("Expected " + expectedVal + " is not equals to " + resultedVal, expectedVal.equals(resultedVal)); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 47fdc7a..82c5351 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -17,15 +17,9 @@ package org.apache.flume.source; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - +import com.google.common.base.Charsets; import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -42,8 +36,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Charsets; -import com.google.common.io.Files; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; public class TestSpoolDirectorySource { static SpoolDirectorySource source; @@ -151,14 +150,13 @@ public class TestSpoolDirectorySource { * Tests if SpoolDirectorySource sets basename headers on events correctly */ @Test - public void testPutBasenameHeader() throws IOException, - InterruptedException { + public void testPutBasenameHeader() throws IOException, InterruptedException { Context context = new Context(); File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); @@ -179,7 +177,7 @@ public class TestSpoolDirectorySource { Assert.assertNotNull("Event headers must not be null", e.getHeaders()); Assert.assertNotNull(e.getHeaders().get("basenameHeaderKeyTest")); Assert.assertEquals(f1.getName(), - e.getHeaders().get("basenameHeaderKeyTest")); + e.getHeaders().get("basenameHeaderKeyTest")); txn.commit(); txn.close(); } @@ -233,7 +231,7 @@ public class TestSpoolDirectorySource { baos.write(e.getBody()); baos.write('\n'); // newline characters are consumed in the process e = channel.take(); - } while(e != null); + } while (e != null); Assert.assertEquals("Event body is correct", Arrays.toString(origBody.getBytes()), @@ -371,7 +369,7 @@ public class TestSpoolDirectorySource { context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, - tmpDir.getAbsolutePath()); + tmpDir.getAbsolutePath()); context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2"); Configurables.configure(source, context); @@ -379,7 +377,7 @@ public class TestSpoolDirectorySource { source.start(); // Wait for the source to read enough events to fill up the channel. - while(!source.hitChannelException()) { + while (!source.hitChannelException()) { Thread.sleep(50); } @@ -402,7 +400,7 @@ public class TestSpoolDirectorySource { tx.close(); } Assert.assertTrue("Expected to hit ChannelException, but did not!", - source.hitChannelException()); + source.hitChannelException()); Assert.assertEquals(8, dataOut.size()); source.stop(); } @@ -424,7 +422,7 @@ public class TestSpoolDirectorySource { Files.touch(f4); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, - tmpDir.getAbsolutePath()); + tmpDir.getAbsolutePath()); Configurables.configure(source, context); source.start(); @@ -432,8 +430,8 @@ public class TestSpoolDirectorySource { Thread.sleep(5000); Assert.assertFalse("Server did not error", source.hasFatalError()); - Assert.assertEquals("One message was read", 1, - source.getSourceCounter().getEventAcceptedCount()); + Assert.assertEquals("One message was read", + 1, source.getSourceCounter().getEventAcceptedCount()); source.stop(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java index a651281..06e663c 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java @@ -101,7 +101,7 @@ public class TestStressSource { } if (i < 3) { verify(mockProcessor, - times(i+1)).processEventBatch(getLastProcessedEventList(source)); + times(i + 1)).processEventBatch(getLastProcessedEventList(source)); } else { verify(mockProcessor, times(1)).processEventBatch(getLastProcessedEventList(source)); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java index 265157e..7edb9b7 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java @@ -20,17 +20,17 @@ package org.apache.flume.source; import com.google.common.base.Charsets; import com.google.common.collect.Lists; -import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.flume.Event; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; import org.junit.Assert; import org.junit.Test; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class TestSyslogParser { @Test public void testRfc5424DateParsing() { @@ -84,7 +84,7 @@ public class TestSyslogParser { Set<String> keepFields = new HashSet<String>(); Event event = parser.parseMessage(msg, charset, keepFields); Assert.assertNull("Failure to parse known-good syslog message", - event.getHeaders().get(SyslogUtils.EVENT_STATUS)); + event.getHeaders().get(SyslogUtils.EVENT_STATUS)); } // test that priority, timestamp and hostname are preserved in event body http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java index 239ba51..10ef8d8 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java @@ -41,7 +41,7 @@ import java.util.List; public class TestSyslogTcpSource { private static final org.slf4j.Logger logger = - LoggerFactory.getLogger(TestSyslogTcpSource.class); + LoggerFactory.getLogger(TestSyslogTcpSource.class); private SyslogTcpSource source; private Channel channel; private static final int TEST_SYSLOG_PORT = 0; @@ -56,7 +56,7 @@ public class TestSyslogTcpSource { private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - private void init(String keepFields){ + private void init(String keepFields) { source = new SyslogTcpSource(); channel = new MemoryChannel(); @@ -116,7 +116,7 @@ public class TestSyslogTcpSource { logger.info(str); if (keepFields.equals("true") || keepFields.equals("all")) { Assert.assertArrayEquals(bodyWithTandH.trim().getBytes(), - e.getBody()); + e.getBody()); } else if (keepFields.equals("false") || keepFields.equals("none")) { Assert.assertArrayEquals(data1.getBytes(), e.getBody()); } else if (keepFields.equals("hostname")) { @@ -136,7 +136,7 @@ public class TestSyslogTcpSource { } @Test - public void testRemoveFields() throws IOException{ + public void testRemoveFields() throws IOException { runKeepFieldsTest("none"); // Backwards compatibility @@ -144,12 +144,12 @@ public class TestSyslogTcpSource { } @Test - public void testKeepHostname() throws IOException{ + public void testKeepHostname() throws IOException { runKeepFieldsTest("hostname"); } @Test - public void testKeepTimestamp() throws IOException{ + public void testKeepTimestamp() throws IOException { runKeepFieldsTest("timestamp"); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java index 8fc80be..e5b7a06 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java @@ -18,14 +18,7 @@ */ package org.apache.flume.source; -import java.util.ArrayList; -import java.util.List; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.DatagramSocket; import com.google.common.base.Charsets; -import com.google.common.base.Strings; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -40,10 +33,16 @@ import org.junit.Assert; import org.junit.Test; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; public class TestSyslogUdpSource { private static final org.slf4j.Logger logger = - LoggerFactory.getLogger(TestSyslogUdpSource.class); + LoggerFactory.getLogger(TestSyslogUdpSource.class); private SyslogUDPSource source; private Channel channel; private static final int TEST_SYSLOG_PORT = 0; @@ -192,12 +191,12 @@ public class TestSyslogUdpSource { } @Test - public void testKeepHostname() throws IOException{ + public void testKeepHostname() throws IOException { runKeepFieldsTest("hostname"); } @Test - public void testKeepTimestamp() throws IOException{ + public void testKeepTimestamp() throws IOException { runKeepFieldsTest("timestamp"); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java index 1c005ff..80d8dac 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java @@ -18,7 +18,6 @@ */ package org.apache.flume.source; - import org.apache.flume.Event; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -41,7 +40,7 @@ public class TestSyslogUtils { String host1 = "ubuntu-11.cloudera.com"; String data1 = "some msg"; // timestamp with hh:mm format timezone with no version - String msg1 = "<10>" + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; + String msg1 = "<10>" + stamp1 + "+08:00" + " " + host1 + " " + data1 + "\n"; checkHeader(msg1, stamp1 + "+0800", format1, host1, data1); } @@ -62,7 +61,7 @@ public class TestSyslogUtils { String host1 = "ubuntu-11.cloudera.com"; String data1 = "some msg"; // timestamp with 'Z' appended, translates to UTC - String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n"; + String msg1 = "<10>1 " + stamp1 + "Z" + " " + host1 + " " + data1 + "\n"; checkHeader(msg1, stamp1 + "+0000", format1, host1, data1); } @@ -73,13 +72,13 @@ public class TestSyslogUtils { String host1 = "ubuntu-11.cloudera.com"; String data1 = "some msg"; // timestamp with hh:mm format timezone - String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; + String msg1 = "<10>1 " + stamp1 + "+08:00" + " " + host1 + " " + data1 + "\n"; checkHeader(msg1, stamp1 + "+0800", format1, host1, data1); } @Test public void TestHeader4() throws ParseException { - String host1 = "ubuntu-11.cloudera.com"; + String host1 = "ubuntu-11.cloudera.com"; String data1 = "some msg"; // null format timestamp (-) String msg1 = "<10>1 " + "-" + " " + host1 + " " + data1 + "\n"; @@ -104,7 +103,7 @@ public class TestSyslogUtils { String host1 = "-"; String data1 = "some msg"; // null host - String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n"; + String msg1 = "<10>1 " + stamp1 + "Z" + " " + host1 + " " + data1 + "\n"; checkHeader(msg1, stamp1 + "+0000", format1, null, data1); } @@ -115,7 +114,7 @@ public class TestSyslogUtils { String host1 = "-"; String data1 = "some msg"; // null host - String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; + String msg1 = "<10>1 " + stamp1 + "+08:00" + " " + host1 + " " + data1 + "\n"; checkHeader(msg1, stamp1 + "+0800", format1, null, data1); } @@ -141,8 +140,7 @@ public class TestSyslogUtils { String data1 = "some msg"; // timestamp with 'Z' appended, translates to UTC String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - checkHeader(msg1, year + stamp1, - format1, host1, data1); + checkHeader(msg1, year + stamp1, format1, host1, data1); } @Test @@ -157,8 +155,7 @@ public class TestSyslogUtils { String data1 = "some msg"; // timestamp with 'Z' appended, translates to UTC String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - checkHeader(msg1, year + stamp1, - format1, host1, data1); + checkHeader(msg1, year + stamp1, format1, host1, data1); } @Test @@ -187,21 +184,19 @@ public class TestSyslogUtils { String host1 = "ubuntu-11.cloudera.com"; String data1 = "- hyphen_null_breaks_5424_pattern [07/Jun/2012:14:46:44 -0600]"; String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - checkHeader(msg1, year + stamp1, - format1, host1, data1); + checkHeader(msg1, year + stamp1, format1, host1, data1); } + /* This test creates a series of dates that range from 10 months in the past to (5 days short + * of) one month in the future. This tests that the year addition code is clever enough to + * handle scenarios where the event received was generated in a different year to what flume + * considers to be "current" (e.g. where there has been some lag somewhere, especially when + * flicking over on New Year's eve, or when you are about to flick over and the flume's + * system clock is slightly slower than the Syslog source's clock). + */ @Test public void TestRfc3164Dates() throws ParseException { - /* - * This test creates a series of dates that range from 10 months in the past to (5 days short of) - * one month in the future. This tests that the year addition code is clever enough to handle scenarios - * where the event received was generated in a different year to what flume considers to be "current" - * (e.g. where there has been some lag somewhere, especially when flicking over on New Year's eve, or - * when you are about to flick over and the flume's system clock is slightly slower than the Syslog - * source's clock). - */ - for (int i=-10; i<=1; i++) { + for (int i = -10; i <= 1; i++) { SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss"); Date date = new Date(System.currentTimeMillis()); Calendar cal = Calendar.getInstance(); @@ -210,7 +205,7 @@ public class TestSyslogUtils { //Small tweak to avoid the 1 month in the future ticking over by a few seconds between now //and when the checkHeader actually runs - if (i==1) { + if (i == 1) { cal.add(Calendar.DAY_OF_MONTH, -1); } @@ -223,8 +218,7 @@ public class TestSyslogUtils { // timestamp with 'Z' appended, translates to UTC String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - checkHeader(msg1, year + stamp1, - format1, host1, data1); + checkHeader(msg1, year + stamp1, format1, host1, data1); } } @@ -234,16 +228,15 @@ public class TestSyslogUtils { if (keepFields == null || keepFields.isEmpty()) { util = new SyslogUtils(SyslogUtils.DEFAULT_SIZE, new HashSet<String>(), false); } else { - util = new SyslogUtils( - SyslogUtils.DEFAULT_SIZE, - SyslogUtils.chooseFieldsToKeep(keepFields), - false); + util = new SyslogUtils(SyslogUtils.DEFAULT_SIZE, + SyslogUtils.chooseFieldsToKeep(keepFields), + false); } ChannelBuffer buff = ChannelBuffers.buffer(200); buff.writeBytes(msg1.getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers2 = e.getHeaders(); @@ -251,13 +244,14 @@ public class TestSyslogUtils { Assert.assertFalse(headers2.containsKey("timestamp")); } else { SimpleDateFormat formater = new SimpleDateFormat(format1, Locale.ENGLISH); - Assert.assertEquals(String.valueOf(formater.parse(stamp1).getTime()), headers2.get("timestamp")); + Assert.assertEquals(String.valueOf(formater.parse(stamp1).getTime()), + headers2.get("timestamp")); } if (host1 == null) { Assert.assertFalse(headers2.containsKey("host")); } else { String host2 = headers2.get("host"); - Assert.assertEquals(host2,host1); + Assert.assertEquals(host2, host1); } Assert.assertEquals(data1, new String(e.getBody())); } @@ -278,22 +272,20 @@ public class TestSyslogUtils { ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); - } /** * Test bad event format 2: The first char is not < */ - @Test public void testExtractBadEvent2() { String badData1 = "hi guys! <10> bad bad data\n"; @@ -301,22 +293,20 @@ public class TestSyslogUtils { ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); - } /** * Test bad event format 3: Empty priority - <> */ - @Test public void testExtractBadEvent3() { String badData1 = "<> bad bad data\n"; @@ -324,22 +314,20 @@ public class TestSyslogUtils { ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); - } /** * Test bad event format 4: Priority too long */ - @Test public void testExtractBadEvent4() { String badData1 = "<123123123123123123123123123123> bad bad data\n"; @@ -347,16 +335,15 @@ public class TestSyslogUtils { ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); - } /** @@ -368,9 +355,9 @@ public class TestSyslogUtils { String goodData1 = "Good good good data\n"; SyslogUtils util = new SyslogUtils(false); ChannelBuffer buff = ChannelBuffers.buffer(100); - buff.writeBytes((priority+goodData1).getBytes()); + buff.writeBytes((priority + goodData1).getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); @@ -378,86 +365,81 @@ public class TestSyslogUtils { Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(priority + goodData1.trim(), - new String(e.getBody()).trim()); - + new String(e.getBody()).trim()); } /** * Bad event immediately followed by a good event */ @Test - public void testBadEventGoodEvent(){ + public void testBadEventGoodEvent() { String badData1 = "hi guys! <10F> bad bad data\n"; SyslogUtils util = new SyslogUtils(false); ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); String priority = "<10>"; String goodData1 = "Good good good data\n"; - buff.writeBytes((priority+goodData1).getBytes()); + buff.writeBytes((priority + goodData1).getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(badData1.trim(), new String(e.getBody()) - .trim()); + headers.get(SyslogUtils.EVENT_STATUS)); + Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); Event e2 = util.extractEvent(buff); - if(e2 == null){ + if (e2 == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers2 = e2.getHeaders(); Assert.assertEquals("1", headers2.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); - Assert.assertEquals(null, - headers2.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(priority + goodData1.trim(), - new String(e2.getBody()).trim()); + Assert.assertEquals(null, headers2.get(SyslogUtils.EVENT_STATUS)); + Assert.assertEquals(priority + goodData1.trim(), new String(e2.getBody()).trim()); } @Test - public void testGoodEventBadEvent(){ + public void testGoodEventBadEvent() { String badData1 = "hi guys! <10F> bad bad data\n"; String priority = "<10>"; String goodData1 = "Good good good data\n"; SyslogUtils util = new SyslogUtils(false); ChannelBuffer buff = ChannelBuffers.buffer(100); - buff.writeBytes((priority+goodData1).getBytes()); + buff.writeBytes((priority + goodData1).getBytes()); buff.writeBytes(badData1.getBytes()); Event e2 = util.extractEvent(buff); - if(e2 == null){ + if (e2 == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers2 = e2.getHeaders(); Assert.assertEquals("1", headers2.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, - headers2.get(SyslogUtils.EVENT_STATUS)); + headers2.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(priority + goodData1.trim(), - new String(e2.getBody()).trim()); + new String(e2.getBody()).trim()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); - } @Test - public void testBadEventBadEvent(){ + public void testBadEventBadEvent() { String badData1 = "hi guys! <10F> bad bad data\n"; SyslogUtils util = new SyslogUtils(false); ChannelBuffer buff = ChannelBuffers.buffer(100); @@ -466,65 +448,63 @@ public class TestSyslogUtils { buff.writeBytes((badData2).getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); Event e2 = util.extractEvent(buff); - if(e2 == null){ + if (e2 == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers2 = e2.getHeaders(); Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers2.get(SyslogUtils.EVENT_STATUS)); + headers2.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(badData2.trim(), new String(e2.getBody()).trim()); } @Test public void testGoodEventGoodEvent() { - String priority = "<10>"; String goodData1 = "Good good good data\n"; SyslogUtils util = new SyslogUtils(false); ChannelBuffer buff = ChannelBuffers.buffer(100); - buff.writeBytes((priority+goodData1).getBytes()); + buff.writeBytes((priority + goodData1).getBytes()); String priority2 = "<20>"; String goodData2 = "Good really good data\n"; - buff.writeBytes((priority2+goodData2).getBytes()); + buff.writeBytes((priority2 + goodData2).getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(priority + goodData1.trim(), - new String(e.getBody()).trim()); + new String(e.getBody()).trim()); Event e2 = util.extractEvent(buff); - if(e2 == null){ + if (e2 == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers2 = e2.getHeaders(); Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("4", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals(priority2 + goodData2.trim(), - new String(e2.getBody()).trim()); - + new String(e2.getBody()).trim()); } @Test @@ -535,28 +515,27 @@ public class TestSyslogUtils { ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); - if(e == null){ + if (e == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers = e.getHeaders(); Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus(), - headers.get(SyslogUtils.EVENT_STATUS)); + headers.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals("<10> bad b".trim(), new String(e.getBody()).trim()); Event e2 = util.extractEvent(buff); - if(e2 == null){ + if (e2 == null) { throw new NullPointerException("Event is null"); } Map<String, String> headers2 = e2.getHeaders(); Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), - headers2.get(SyslogUtils.EVENT_STATUS)); + headers2.get(SyslogUtils.EVENT_STATUS)); Assert.assertEquals("ad data ba".trim(), new String(e2.getBody()).trim()); - } @Test @@ -580,7 +559,8 @@ public class TestSyslogUtils { checkHeader("priority timestamp hostname", msg1, stamp1 + "+0800", format1, host1, data4); String data5 = "<10>1 2012-04-13T11:11:11+08:00 ubuntu-11.cloudera.com some msg"; - checkHeader("priority version timestamp hostname", msg1, stamp1 + "+0800", format1, host1, data5); + checkHeader("priority version timestamp hostname", msg1, stamp1 + "+0800", + format1, host1, data5); checkHeader("all", msg1, stamp1 + "+0800", format1, host1, data5); checkHeader("true", msg1, stamp1 + "+0800", format1, host1, data5); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java index 3d2901a..cdaefaf 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java @@ -67,10 +67,9 @@ public class TestThriftSource { port = random.nextInt(50000) + 1024; props.clear(); props.setProperty("hosts", "h1"); - props.setProperty("hosts.h1", "0.0.0.0:"+ String.valueOf(port)); + props.setProperty("hosts.h1", "0.0.0.0:" + String.valueOf(port)); props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10"); - props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, - "2000"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, "2000"); channel = new MemoryChannel(); source = new ThriftSource(); } @@ -110,7 +109,7 @@ public class TestThriftSource { context.put("keymanager-type", KeyManagerFactory.getDefaultAlgorithm()); Configurables.configure(source, context); source.start(); - for(int i = 0; i < 30; i++) { + for (int i = 0; i < 30; i++) { client.append(EventBuilder.withBody(String.valueOf(i).getBytes())); } Transaction transaction = channel.getTransaction(); @@ -135,7 +134,7 @@ public class TestThriftSource { context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); Configurables.configure(source, context); source.start(); - for(int i = 0; i < 30; i++) { + for (int i = 0; i < 30; i++) { client.append(EventBuilder.withBody(String.valueOf(i).getBytes())); } Transaction transaction = channel.getTransaction(); @@ -188,8 +187,8 @@ public class TestThriftSource { int index = 0; //30 batches of 10 - for(int i = 0; i < 30; i++) { - for(int j = 0; j < 10; j++) { + for (int i = 0; i < 30; i++) { + for (int j = 0; j < 10; j++) { Assert.assertEquals(i, events.get(index++).intValue()); } } @@ -233,8 +232,8 @@ public class TestThriftSource { int index = 0; //10 batches of 500 - for(int i = 0; i < 5; i++) { - for(int j = 0; j < 500; j++) { + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 500; j++) { Assert.assertEquals(i, events.get(index++).intValue()); } } @@ -253,15 +252,14 @@ public class TestThriftSource { context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); Configurables.configure(source, context); source.start(); - ExecutorCompletionService<Void> completionService = new - ExecutorCompletionService(submitter); + ExecutorCompletionService<Void> completionService = new ExecutorCompletionService(submitter); for (int i = 0; i < 30; i++) { completionService.submit(new SubmitHelper(i), null); } //wait for all threads to be done - for(int i = 0; i < 30; i++) { + for (int i = 0; i < 30; i++) { completionService.take(); } @@ -282,19 +280,20 @@ public class TestThriftSource { int index = 0; //30 batches of 10 - for(int i = 0; i < 30; i++) { - for(int j = 0; j < 10; j++) { + for (int i = 0; i < 30; i++) { + for (int j = 0; j < 10; j++) { Assert.assertEquals(i, events.get(index++).intValue()); } } } private class SubmitHelper implements Runnable { - private final int i; + public SubmitHelper(int i) { this.i = i; } + @Override public void run() { List<Event> events = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java index 6b94b2e..475d92f 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java @@ -40,7 +40,9 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { private BufferedReader reader; String charset; - public FlumeHttpServletRequestWrapper(String data, String charset) throws UnsupportedEncodingException { + + public FlumeHttpServletRequestWrapper(String data, String charset) + throws UnsupportedEncodingException { reader = new BufferedReader(new InputStreamReader( new ByteArrayInputStream(data.getBytes(charset)), charset)); this.charset = charset;
