Updated Branches: refs/heads/trunk 6b9103712 -> 400403267
FLUME-1980. Log4jAppender should optionally drop events if append fails. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/40040326 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/40040326 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/40040326 Branch: refs/heads/trunk Commit: 40040326763cdc655ea2707d8cb90a46791192c8 Parents: 6b91037 Author: Mike Percy <[email protected]> Authored: Fri May 10 16:02:56 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri May 10 16:02:56 2013 -0700 ---------------------------------------------------------------------- .../log4jappender/LoadBalancingLog4jAppender.java | 32 ++++++- .../flume/clients/log4jappender/Log4jAppender.java | 65 +++++++++++--- .../TestLoadBalancingLog4jAppender.java | 63 +++++++++++++- .../clients/log4jappender/TestLog4jAppender.java | 44 ++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 3 + 5 files changed, 184 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java index 9fb115e..3172e21 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java @@ -26,6 +26,7 @@ import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.api.RpcClientFactory.ClientType; import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.spi.LoggingEvent; /** * @@ -82,6 +83,7 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { private String hosts; private String selector; private String maxBackoff; + private boolean configured = false; public void setHosts(String hostNames) { this.hosts = hostNames; @@ -95,6 +97,20 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { this.maxBackoff = maxBackoff; } + @Override + public synchronized void append(LoggingEvent event) { + if(!configured) { + String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" + + " send events to Flume."; + LogLog.error(errorMsg); + if(getUnsafeMode()) { + return; + } + throw new FlumeException(errorMsg); + } + super.append(event); + } + /** * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt> * and <tt>setMaxBackoff</tt> @@ -107,18 +123,26 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { try { final Properties properties = getProperties(hosts, selector, maxBackoff); rpcClient = RpcClientFactory.getInstance(properties); - } catch (FlumeException e) { + if(layout != null) { + layout.activateOptions(); + } + configured = true; + } catch (Exception e) { String errormsg = "RPC client creation failed! " + e.getMessage(); LogLog.error(errormsg); - throw e; + if (getUnsafeMode()) { + return; + } + throw new FlumeException(e); } + } private Properties getProperties(String hosts, String selector, String maxBackoff) throws FlumeException { if (StringUtils.isEmpty(hosts)) { - throw new IllegalArgumentException("hosts must not be null"); + throw new FlumeException("hosts must not be null"); } Properties props = new Properties(); @@ -141,7 +165,7 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { if (!StringUtils.isEmpty(maxBackoff)) { long millis = Long.parseLong(maxBackoff.trim()); if (millis <= 0) { - throw new IllegalArgumentException( + throw new FlumeException( "Misconfigured max backoff, value must be greater than 0"); } props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF, http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java index d61f807..0ba56d3 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java @@ -30,7 +30,6 @@ import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Layout; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; @@ -63,6 +62,7 @@ public class Log4jAppender extends AppenderSkeleton { private String hostname; private int port; + private boolean unsafeMode = false; RpcClient rpcClient = null; @@ -101,9 +101,14 @@ public class Log4jAppender extends AppenderSkeleton { //setup by setting hostname and port and then calling activateOptions //or this appender object was closed by calling close(), so we throw an //exception to show the appender is no longer accessible. - if(rpcClient == null){ - throw new FlumeException("Cannot Append to Appender!" + - "Appender either closed or not setup correctly!"); + if (rpcClient == null) { + String errorMsg = "Cannot Append to Appender! Appender either closed or" + + " not setup correctly!"; + LogLog.error(errorMsg); + if (unsafeMode) { + return; + } + throw new FlumeException(errorMsg); } if(!rpcClient.isActive()){ @@ -138,6 +143,9 @@ public class Log4jAppender extends AppenderSkeleton { } catch (EventDeliveryException e) { String msg = "Flume append() failed."; LogLog.error(msg); + if (unsafeMode) { + return; + } throw new FlumeException(msg + " Exception follows.", e); } } @@ -152,11 +160,27 @@ public class Log4jAppender extends AppenderSkeleton { * @throws FlumeException if errors occur during close */ @Override - public synchronized void close() throws FlumeException{ - //Any append calls after this will result in an Exception. + public synchronized void close() throws FlumeException { + // Any append calls after this will result in an Exception. if (rpcClient != null) { - rpcClient.close(); - rpcClient = null; + try { + rpcClient.close(); + } catch (FlumeException ex) { + LogLog.error("Error while trying to close RpcClient.", ex); + if (unsafeMode) { + return; + } + throw ex; + } finally { + rpcClient = null; + } + } else { + String errorMsg = "Flume log4jappender already closed!"; + LogLog.error(errorMsg); + if(unsafeMode) { + return; + } + throw new FlumeException(errorMsg); } } @@ -184,25 +208,38 @@ public class Log4jAppender extends AppenderSkeleton { public void setPort(int port){ this.port = port; } + + public void setUnsafeMode(boolean unsafeMode) { + this.unsafeMode = unsafeMode; + } + + public boolean getUnsafeMode() { + return unsafeMode; + } + /** * Activate the options set using <tt>setPort()</tt> * and <tt>setHostname()</tt> + * * @throws FlumeException if the <tt>hostname</tt> and - * <tt>port</tt> combination is invalid. + * <tt>port</tt> combination is invalid. */ @Override - public void activateOptions() throws FlumeException{ + public void activateOptions() throws FlumeException { try { rpcClient = RpcClientFactory.getDefaultInstance(hostname, port); + if (layout != null) { + layout.activateOptions(); + } } catch (FlumeException e) { String errormsg = "RPC client creation failed! " + - e.getMessage(); + e.getMessage(); LogLog.error(errormsg); + if (unsafeMode) { + return; + } throw e; } - if(layout != null) { - layout.activateOptions(); - } } /** http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java index 657af67..103bcb6 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java @@ -56,7 +56,7 @@ import org.junit.Test; import com.google.common.collect.Lists; -public class TestLoadBalancingLog4jAppender{ +public class TestLoadBalancingLog4jAppender { private final List<CountingAvroSource> sources = Lists.newArrayList(); private Channel ch; @@ -89,7 +89,7 @@ public class TestLoadBalancingLog4jAppender{ File TESTFILE = new File(TestLoadBalancingLog4jAppender.class .getClassLoader() .getResource("flume-loadbalancinglog4jtest.properties").getFile()); - startSources(TESTFILE, new int[] { 25430, 25431 }); + startSources(TESTFILE, false, new int[] { 25430, 25431 }); sendAndAssertMessages(numberOfMsgs); @@ -104,7 +104,8 @@ public class TestLoadBalancingLog4jAppender{ File TESTFILE = new File(TestLoadBalancingLog4jAppender.class .getClassLoader() .getResource("flume-loadbalancing-rnd-log4jtest.properties").getFile()); - startSources(TESTFILE, new int[] { 25430, 25431, 25432, 25433, 25434, + startSources(TESTFILE, false, new int[] { 25430, 25431, 25432, 25433, + 25434, 25435, 25436, 25437, 25438, 25439 }); sendAndAssertMessages(numberOfMsgs); @@ -126,7 +127,7 @@ public class TestLoadBalancingLog4jAppender{ .getClassLoader() .getResource("flume-loadbalancing-backoff-log4jtest.properties") .getFile()); - startSources(TESTFILE, new int[] { 25430, 25431, 25432 }); + startSources(TESTFILE, false, new int[] { 25430, 25431, 25432 }); sources.get(0).setFail(); sources.get(2).setFail(); @@ -154,6 +155,39 @@ public class TestLoadBalancingLog4jAppender{ Assert.assertEquals(0, sources.get(2).appendCount.intValue()); } + @Test + public void testRandomBackoffUnsafeMode() throws Exception { + File TESTFILE = new File(TestLoadBalancingLog4jAppender.class + .getClassLoader() + .getResource("flume-loadbalancing-backoff-log4jtest.properties") + .getFile()); + startSources(TESTFILE, true, new int[]{25430, 25431, 25432}); + + sources.get(0).setFail(); + sources.get(1).setFail(); + sources.get(2).setFail(); + sendAndAssertFail(); + + } + + @Test(expected = EventDeliveryException.class) + public void testRandomBackoffNotUnsafeMode() throws Throwable { + File TESTFILE = new File(TestLoadBalancingLog4jAppender.class + .getClassLoader() + .getResource("flume-loadbalancing-backoff-log4jtest.properties") + .getFile()); + startSources(TESTFILE, false, new int[]{25430, 25431, 25432}); + + sources.get(0).setFail(); + sources.get(1).setFail(); + sources.get(2).setFail(); + try { + sendAndAssertFail(); + } catch (FlumeException ex) { + throw ex.getCause(); + } + } + private void send(int numberOfMsgs) throws EventDeliveryException { for (int count = 0; count < numberOfMsgs; count++) { int level = count % 5; @@ -162,6 +196,21 @@ public class TestLoadBalancingLog4jAppender{ } } + private void sendAndAssertFail() throws IOException { + int level = 20000; + String msg = "This is log message number" + String.valueOf(level); + fixture.log(Level.toLevel(level), msg); + + Transaction transaction = ch.getTransaction(); + transaction.begin(); + Event event = ch.take(); + Assert.assertNull(event); + + transaction.commit(); + transaction.close(); + + } + private void sendAndAssertMessages(int numberOfMsgs) throws IOException { for (int count = 0; count < numberOfMsgs; count++) { int level = count % 5; @@ -194,7 +243,9 @@ public class TestLoadBalancingLog4jAppender{ } - private void startSources(File log4jProps, int... ports) throws IOException { + private void startSources(File log4jProps, boolean unsafeMode, int... ports) + throws + IOException { for (int port : ports) { CountingAvroSource source = new CountingAvroSource(port); Context context = new Context(); @@ -218,6 +269,8 @@ public class TestLoadBalancingLog4jAppender{ FileReader reader = new FileReader(log4jProps); Properties props = new Properties(); props.load(reader); + props.setProperty("log4j.appender.out2.UnsafeMode", + String.valueOf(unsafeMode)); PropertyConfigurator.configure(props); fixture = LogManager.getLogger(TestLoadBalancingLog4jAppender.class); } http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java index de88730..211837b 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java @@ -29,6 +29,8 @@ import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -118,6 +120,48 @@ public class TestLog4jAppender{ } @Test + public void testLog4jAppenderFailureUnsafeMode() throws Throwable { + props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(true)); + PropertyConfigurator.configure(props); + Logger logger = LogManager.getLogger(TestLog4jAppender.class); + source.stop(); + sendAndAssertFail(logger); + + } + + @Test(expected = EventDeliveryException.class) + public void testLog4jAppenderFailureNotUnsafeMode() throws Throwable { + PropertyConfigurator.configure(props); + Logger logger = LogManager.getLogger(TestLog4jAppender.class); + source.stop(); + sendAndAssertFail(logger); + + } + + private void sendAndAssertFail(Logger logger) throws Throwable { + /* + * Log4j internally defines levels as multiples of 10000. So if we + * create levels directly using count, the level will be set as the + * default. + */ + int level = 20000; + try { + logger.log(Level.toLevel(level), "Test Msg"); + } catch (FlumeException ex) { + ex.printStackTrace(); + throw ex.getCause(); + } + Transaction transaction = ch.getTransaction(); + transaction.begin(); + Event event = ch.take(); + Assert.assertNull(event); + transaction.commit(); + transaction.close(); + + } + + + @Test public void testLayout() throws IOException { props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout"); props.put("log4j.appender.out2.layout.ConversionPattern", http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index d129abf..2ee41be 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2601,6 +2601,7 @@ Property Name Default Description ============= ======= ========================================================================== Hostname -- The hostname on which a remote Flume agent is running with an avro source. Port -- The port at which the remote Flume agent's avro source is listening. +UnsafeMode false If true, the appender will not throw exceptions on failure to send the events. ============= ======= ========================================================================== @@ -2612,6 +2613,7 @@ Sample log4j.properties file: log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 + log4j.appender.flume.UnsafeMode = true # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume @@ -2637,6 +2639,7 @@ Selector ROUND_ROBIN Selection mechanism. Must be either ROUND_ROBIN, MaxBackoff -- A long value representing the maximum amount of time in milliseconds the Load balancing client will backoff from a node that has failed to consume an event. Defaults to no backoff +UnsafeMode false If true, the appender will not throw exceptions on failure to send the events. ============= =========== ==========================================================================
