Updated Branches: refs/heads/flume-1.4 db3984ba6 -> ddfa991e6
FLUME-1818: Support various layouts in log4jappender (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ddfa991e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ddfa991e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ddfa991e Branch: refs/heads/flume-1.4 Commit: ddfa991e6c3ef14b45758f8f092d9101d97d013e Parents: db3984b Author: Brock Noland <[email protected]> Authored: Mon Jan 14 13:30:58 2013 -0800 Committer: Brock Noland <[email protected]> Committed: Mon Jan 14 13:31:10 2013 -0800 ---------------------------------------------------------------------- .../flume/clients/log4jappender/Log4jAppender.java | 22 +++- .../clients/log4jappender/TestLog4jAppender.java | 94 +++++++++++--- 2 files changed, 92 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ddfa991e/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java index 083f5d1..315a68c 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java @@ -30,6 +30,7 @@ import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Layout; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; @@ -121,8 +122,15 @@ public class Log4jAppender extends AppenderSkeleton { String.valueOf(event.getLevel().toInt())); hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8"); - Event flumeEvent = EventBuilder.withBody(event.getMessage().toString(), - Charset.forName("UTF8"), hdrs); + String message = null; + if(this.layout != null) { + message = this.layout.format(event); + } else { + message = event.getMessage().toString(); + } + + Event flumeEvent = EventBuilder.withBody( + message, Charset.forName("UTF8"), hdrs); try { rpcClient.append(flumeEvent); @@ -153,7 +161,11 @@ public class Log4jAppender extends AppenderSkeleton { @Override public boolean requiresLayout() { - return false; + // This method is named quite incorrectly in the interface. It should + // probably be called canUseLayout or something. According to the docs, + // even if the appender can work without a layout, if it can work with one, + // this method must return true. + return true; } /** @@ -171,7 +183,6 @@ public class Log4jAppender extends AppenderSkeleton { public void setPort(int port){ this.port = port; } - /** * Activate the options set using <tt>setPort()</tt> * and <tt>setHostname()</tt> @@ -188,6 +199,9 @@ public class Log4jAppender extends AppenderSkeleton { LogLog.error(errormsg); throw e; } + if(layout != null) { + layout.activateOptions(); + } } /** http://git-wip-us.apache.org/repos/asf/flume/blob/ddfa991e/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java index 68d95fb..de88730 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java @@ -21,10 +21,7 @@ package org.apache.flume.clients.log4jappender; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import junit.framework.Assert; @@ -49,9 +46,10 @@ import org.junit.Test; public class TestLog4jAppender{ private AvroSource source; private Channel ch; + private Properties props; @Before - public void initiate() throws InterruptedException{ + public void initiate() throws Exception{ int port = 25430; source = new AvroSource(); ch = new MemoryChannel(); @@ -71,25 +69,25 @@ public class TestLog4jAppender{ source.setChannelProcessor(new ChannelProcessor(rcs)); source.start(); - - } - @Test - public void testLog4jAppender() throws IOException { - //The properties file having Avro port info should be loaded only - //after the test begins, else log4j tries to connect to the source - //before the source has started up in the above function, since - //log4j setup is completed before the @Before calls also. - //This will cause the test to fail even before it starts! File TESTFILE = new File( TestLog4jAppender.class.getClassLoader() - .getResource("flume-log4jtest.properties").getFile()); + .getResource("flume-log4jtest.properties").getFile()); FileReader reader = new FileReader(TESTFILE); - Properties props = new Properties(); + props = new Properties(); props.load(reader); + reader.close(); + } + @Test + public void testLog4jAppender() throws IOException { PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); for(int count = 0; count <= 1000; count++){ - int level = count % 5; + /* + * Log4j internally defines levels as multiples of 10000. So if we + * create levels directly using count, the level will be set as the + * default. + */ + int level = ((count % 5)+1)*10000; String msg = "This is log message number" + String.valueOf(count); logger.log(Level.toLevel(level), msg); @@ -104,23 +102,79 @@ public class TestLog4jAppender{ Assert.assertNotNull(hdrs.get(Log4jAvroHeaders.TIMESTAMP.toString())); Assert.assertEquals(Level.toLevel(level), - Level.toLevel(hdrs.get(Log4jAvroHeaders.LOG_LEVEL.toString()))); + Level.toLevel(Integer.valueOf(hdrs.get(Log4jAvroHeaders.LOG_LEVEL + .toString())) + )); + + Assert.assertEquals(logger.getName(), + hdrs.get(Log4jAvroHeaders.LOGGER_NAME.toString())); + + Assert.assertEquals("UTF8", + hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString())); + transaction.commit(); + transaction.close(); + } + + } + + @Test + public void testLayout() throws IOException { + props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout"); + props.put("log4j.appender.out2.layout.ConversionPattern", + "%-5p [%t]: %m%n"); + PropertyConfigurator.configure(props); + Logger logger = LogManager.getLogger(TestLog4jAppender.class); + Thread.currentThread().setName("Log4jAppenderTest"); + for(int count = 0; count <= 100; count++){ + /* + * Log4j internally defines levels as multiples of 10000. So if we + * create levels directly using count, the level will be set as the + * default. + */ + int level = ((count % 5)+1)*10000; + String msg = "This is log message number" + String.valueOf(count); + + logger.log(Level.toLevel(level), msg); + Transaction transaction = ch.getTransaction(); + transaction.begin(); + Event event = ch.take(); + Assert.assertNotNull(event); + StringBuilder builder = new StringBuilder(); + builder.append("[").append("Log4jAppenderTest").append("]: ") + .append(msg); + //INFO seems to insert an extra space, so lets split the string. + String eventBody = new String(event.getBody(), "UTF-8"); + String eventLevel = eventBody.split("\\s+")[0]; + Assert.assertEquals(Level.toLevel(level).toString(), eventLevel); + Assert.assertEquals( + new String(event.getBody(), "UTF8").trim() + .substring(eventLevel.length()).trim(), builder.toString()); + + Map<String, String> hdrs = event.getHeaders(); + + Assert.assertNotNull(hdrs.get(Log4jAvroHeaders.TIMESTAMP.toString())); + + Assert.assertEquals(Level.toLevel(level), + Level.toLevel(Integer.parseInt(hdrs.get(Log4jAvroHeaders.LOG_LEVEL + .toString())))); Assert.assertEquals(logger.getName(), hdrs.get(Log4jAvroHeaders.LOGGER_NAME.toString())); Assert.assertEquals("UTF8", hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString())); - //To confirm on console we actually got the body - System.out.println("Got body: "+new String(event.getBody(), "UTF8")); transaction.commit(); transaction.close(); } + } @After public void cleanUp(){ + source.stop(); + ch.stop(); + props.clear(); } }
