Updated Branches: refs/heads/flume-1.4 062ca69be -> 4a9786468
FLUME-2042. log4jappender timeout should be configurable (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4a978646 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4a978646 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4a978646 Branch: refs/heads/flume-1.4 Commit: 4a9786468d40ce1766983f3c45b1ecb8f40532b1 Parents: 062ca69 Author: Mike Percy <[email protected]> Authored: Sat May 11 14:30:48 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Sat May 11 14:31:28 2013 -0700 ---------------------------------------------------------------------- .../log4jappender/LoadBalancingLog4jAppender.java | 9 ++- .../flume/clients/log4jappender/Log4jAppender.java | 23 +++++- .../TestLoadBalancingLog4jAppender.java | 30 +++++++ .../clients/log4jappender/TestLog4jAppender.java | 68 +++++++++++++-- 4 files changed, 120 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4a978646/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java index 3172e21..713234f 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java @@ -121,7 +121,8 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { @Override public void activateOptions() throws FlumeException { try { - final Properties properties = getProperties(hosts, selector, maxBackoff); + final Properties properties = getProperties(hosts, selector, + maxBackoff, getTimeout()); rpcClient = RpcClientFactory.getInstance(properties); if(layout != null) { layout.activateOptions(); @@ -139,7 +140,7 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { } private Properties getProperties(String hosts, String selector, - String maxBackoff) throws FlumeException { + String maxBackoff, long timeout) throws FlumeException { if (StringUtils.isEmpty(hosts)) { throw new FlumeException("hosts must not be null"); @@ -172,6 +173,10 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { String.valueOf(true)); props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff); } + props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, + String.valueOf(timeout)); + props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, + String.valueOf(timeout)); return props; } } http://git-wip-us.apache.org/repos/asf/flume/blob/4a978646/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java index 0ba56d3..532b761 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java @@ -21,11 +21,13 @@ package org.apache.flume.clients.log4jappender; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; @@ -63,6 +65,8 @@ public class Log4jAppender extends AppenderSkeleton { private String hostname; private int port; private boolean unsafeMode = false; + private long timeout = RpcClientConfigurationConstants + .DEFAULT_REQUEST_TIMEOUT_MILLIS; RpcClient rpcClient = null; @@ -217,6 +221,15 @@ public class Log4jAppender extends AppenderSkeleton { return unsafeMode; } + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public long getTimeout() { + return this.timeout; + } + + /** * Activate the options set using <tt>setPort()</tt> * and <tt>setHostname()</tt> @@ -226,8 +239,16 @@ public class Log4jAppender extends AppenderSkeleton { */ @Override public void activateOptions() throws FlumeException { + Properties props = new Properties(); + props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1", + hostname + ":" + port); + props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, + String.valueOf(timeout)); + props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, + String.valueOf(timeout)); try { - rpcClient = RpcClientFactory.getDefaultInstance(hostname, port); + rpcClient = RpcClientFactory.getInstance(props); if (layout != null) { layout.activateOptions(); } http://git-wip-us.apache.org/repos/asf/flume/blob/4a978646/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java index 103bcb6..267ac1d 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java @@ -62,10 +62,16 @@ public class TestLoadBalancingLog4jAppender { private Channel ch; private ChannelSelector rcs; private Logger fixture; + private boolean slowDown = false; @Before public void initiate() throws InterruptedException{ ch = new MemoryChannel(); + configureChannel(); + + } + + private void configureChannel() { Configurables.configure(ch, new Context()); List<Channel> channels = new ArrayList<Channel>(); @@ -170,6 +176,27 @@ public class TestLoadBalancingLog4jAppender { } + @Test (expected = EventDeliveryException.class) + public void testTimeout() throws Throwable { + File TESTFILE = new File(TestLoadBalancingLog4jAppender.class + .getClassLoader() + .getResource("flume-loadbalancinglog4jtest.properties") + .getFile()); + + ch = new TestLog4jAppender.SlowMemoryChannel(2000); + configureChannel(); + slowDown = true; + startSources(TESTFILE, false, new int[]{25430, 25431, 25432}); + int level = 20000; + String msg = "This is log message number" + String.valueOf(level); + try { + fixture.log(Level.toLevel(level), msg); + } catch (FlumeException ex) { + throw ex.getCause(); + } + + } + @Test(expected = EventDeliveryException.class) public void testRandomBackoffNotUnsafeMode() throws Throwable { File TESTFILE = new File(TestLoadBalancingLog4jAppender.class @@ -271,6 +298,9 @@ public class TestLoadBalancingLog4jAppender { props.load(reader); props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(unsafeMode)); + if(slowDown) { + props.setProperty("log4j.appender.out2.Timeout", String.valueOf(1000)); + } PropertyConfigurator.configure(props); fixture = LogManager.getLogger(TestLoadBalancingLog4jAppender.class); } http://git-wip-us.apache.org/repos/asf/flume/blob/4a978646/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java index 211837b..1b840f3 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.*; +import java.util.concurrent.TimeUnit; import junit.framework.Assert; @@ -62,6 +63,16 @@ public class TestLog4jAppender{ context.put("bind", "localhost"); Configurables.configure(source, context); + File TESTFILE = new File( + TestLog4jAppender.class.getClassLoader() + .getResource("flume-log4jtest.properties").getFile()); + FileReader reader = new FileReader(TESTFILE); + props = new Properties(); + props.load(reader); + reader.close(); + } + + private void configureSource() { List<Channel> channels = new ArrayList<Channel>(); channels.add(ch); @@ -71,16 +82,10 @@ public class TestLog4jAppender{ source.setChannelProcessor(new ChannelProcessor(rcs)); source.start(); - File TESTFILE = new File( - TestLog4jAppender.class.getClassLoader() - .getResource("flume-log4jtest.properties").getFile()); - FileReader reader = new FileReader(TESTFILE); - props = new Properties(); - props.load(reader); - reader.close(); } @Test public void testLog4jAppender() throws IOException { + configureSource(); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); for(int count = 0; count <= 1000; count++){ @@ -121,6 +126,7 @@ public class TestLog4jAppender{ @Test public void testLog4jAppenderFailureUnsafeMode() throws Throwable { + configureSource(); props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(true)); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); @@ -131,6 +137,7 @@ public class TestLog4jAppender{ @Test(expected = EventDeliveryException.class) public void testLog4jAppenderFailureNotUnsafeMode() throws Throwable { + configureSource(); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); source.stop(); @@ -163,6 +170,7 @@ public class TestLog4jAppender{ @Test public void testLayout() throws IOException { + configureSource(); props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout"); props.put("log4j.appender.out2.layout.ConversionPattern", "%-5p [%t]: %m%n"); @@ -214,6 +222,34 @@ public class TestLog4jAppender{ } + @Test(expected = EventDeliveryException.class) + public void testSlowness() throws Throwable { + ch = new SlowMemoryChannel(2000); + Configurables.configure(ch, new Context()); + configureSource(); + props.put("log4j.appender.out2.Timeout", "1000"); + props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout"); + props.put("log4j.appender.out2.layout.ConversionPattern", + "%-5p [%t]: %m%n"); + PropertyConfigurator.configure(props); + Logger logger = LogManager.getLogger(TestLog4jAppender.class); + Thread.currentThread().setName("Log4jAppenderTest"); + int level = 10000; + String msg = "This is log message number" + String.valueOf(1); + try { + logger.log(Level.toLevel(level), msg); + } catch (FlumeException ex) { + throw ex.getCause(); + } + } + + @Test // Should not throw + public void testSlownessUnsafeMode() throws Throwable { + props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(true)); + testSlowness(); + } + + @After public void cleanUp(){ source.stop(); @@ -221,4 +257,22 @@ public class TestLog4jAppender{ props.clear(); } + + static class SlowMemoryChannel extends MemoryChannel { + private final int slowTime; + + public SlowMemoryChannel(int slowTime) { + this.slowTime = slowTime; + } + + public void put(Event e) { + try { + TimeUnit.MILLISECONDS.sleep(slowTime); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + super.put(e); + } + } + }
