Repository: logging-log4j2 Updated Branches: refs/heads/master 4c6d636b6 -> 3780d00c0
LOG4J2-1044 - Support batchSize in FlumeAvroManager. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/3780d00c Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/3780d00c Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/3780d00c Branch: refs/heads/master Commit: 3780d00c03770e4357d9def9a0460d6b665c90e3 Parents: 4c6d636 Author: Ralph Goers <[email protected]> Authored: Sun Jun 14 18:17:10 2015 -0700 Committer: Ralph Goers <[email protected]> Committed: Sun Jun 14 18:17:10 2015 -0700 ---------------------------------------------------------------------- .../log4j/flume/appender/FlumeAppender.java | 4 +- .../log4j/flume/appender/FlumeAvroManager.java | 71 ++++++++++++++------ .../flume/appender/FlumePersistentManager.java | 15 ++--- .../log4j/flume/appender/FlumeAppenderTest.java | 41 +++++++++++ src/changes/changes.xml | 3 + 5 files changed, 101 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java index c37ea1d..1c466ef 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java @@ -247,7 +247,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF LOGGER.debug("No agents provided, using defaults"); agents = new Agent[] {Agent.createAgent(null, null)}; } - manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis); + manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis); break; case PERSISTENT: if (agents == null || agents.length == 0) { @@ -263,7 +263,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF LOGGER.debug("No agents provided, using defaults"); agents = new Agent[] {Agent.createAgent(null, null)}; } - manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis); + manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis); } if (manager == null) { http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java index a4d52b6..4f5bd1a 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java @@ -38,6 +38,9 @@ public class FlumeAvroManager extends AbstractFlumeManager { private final int batchSize; + private final long delayNanos; + private final int delayMillis; + private final int retries; private final int connectTimeoutMillis; @@ -48,6 +51,9 @@ public class FlumeAvroManager extends AbstractFlumeManager { private RpcClient rpcClient = null; + private BatchEvent batchEvent = new BatchEvent(); + private long nextSend = 0; + /** * Constructor * @param name The unique name of this manager. @@ -59,10 +65,12 @@ public class FlumeAvroManager extends AbstractFlumeManager { * */ protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, - final int retries, final int connectTimeout, final int requestTimeout) { + final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) { super(name); this.agents = agents; this.batchSize = batchSize; + this.delayMillis = delayMillis; + this.delayNanos = delayMillis * 1000000; this.retries = retries; this.connectTimeoutMillis = connectTimeout; this.requestTimeoutMillis = requestTimeout; @@ -74,12 +82,13 @@ public class FlumeAvroManager extends AbstractFlumeManager { * @param name The name of the manager. * @param agents The agents to use. * @param batchSize The number of events to include in a batch. + * @param delayMillis The number of milliseconds to wait before sending an incomplete batch. * @param retries The number of times to retry connecting before giving up. * @param connectTimeoutMillis The connection timeout in ms. * @param requestTimeoutMillis The request timeout in ms. * @return A FlumeAvroManager. */ - public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, + public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, int delayMillis, final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { if (agents == null || agents.length == 0) { throw new IllegalArgumentException("At least one agent is required"); @@ -100,7 +109,7 @@ public class FlumeAvroManager extends AbstractFlumeManager { } sb.append(']'); return getManager(sb.toString(), factory, - new FactoryData(name, agents, batchSize, retries, connectTimeoutMillis, requestTimeoutMillis)); + new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis)); } /** @@ -135,6 +144,10 @@ public class FlumeAvroManager extends AbstractFlumeManager { return batchSize; } + public int getDelayMillis() { + return delayMillis; + } + public synchronized void send(final BatchEvent events) { if (rpcClient == null) { rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); @@ -162,26 +175,38 @@ public class FlumeAvroManager extends AbstractFlumeManager { @Override public synchronized void send(final Event event) { - if (rpcClient == null) { - rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); - } + if (batchSize == 1) { + if (rpcClient == null) { + rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); + } - if (rpcClient != null) { - try { - rpcClient.append(event); - } catch (final Exception ex) { - rpcClient.close(); - rpcClient = null; + if (rpcClient != null) { + try { + rpcClient.append(event); + } catch (final Exception ex) { + rpcClient.close(); + rpcClient = null; + final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + + agents[current].getPort(); + LOGGER.warn(msg, ex); + throw new AppenderLoggingException("No Flume agents are available"); + } + } else { final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + - agents[current].getPort(); - LOGGER.warn(msg, ex); + agents[current].getPort(); + LOGGER.warn(msg); throw new AppenderLoggingException("No Flume agents are available"); } } else { - final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + - agents[current].getPort(); - LOGGER.warn(msg); - throw new AppenderLoggingException("No Flume agents are available"); + batchEvent.addEvent(event); + int count = batchEvent.getEvents().size(); + if (count == 1) { + nextSend = System.nanoTime() + delayNanos; + } + if (count >= batchSize || System.nanoTime() >= nextSend) { + send(batchEvent); + batchEvent = new BatchEvent(); + } } } @@ -248,6 +273,7 @@ public class FlumeAvroManager extends AbstractFlumeManager { private final String name; private final Agent[] agents; private final int batchSize; + private final int delayMillis; private final int retries; private final int conntectTimeoutMillis; private final int requestTimeoutMillis; @@ -258,11 +284,12 @@ public class FlumeAvroManager extends AbstractFlumeManager { * @param agents The agents. * @param batchSize The number of events to include in a batch. */ - public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, - final int connectTimeoutMillis, final int requestTimeoutMillis) { + public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis, + final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { this.name = name; this.agents = agents; this.batchSize = batchSize; + this.delayMillis = delayMillis; this.retries = retries; this.conntectTimeoutMillis = connectTimeoutMillis; this.requestTimeoutMillis = requestTimeoutMillis; @@ -284,8 +311,8 @@ public class FlumeAvroManager extends AbstractFlumeManager { public FlumeAvroManager createManager(final String name, final FactoryData data) { try { - return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries, - data.conntectTimeoutMillis, data.requestTimeoutMillis); + return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis, + data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis); } catch (final Exception ex) { LOGGER.error("Could not create FlumeAvroManager", ex); } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java index e643d31..c0f8879 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java @@ -90,8 +90,6 @@ public class FlumePersistentManager extends FlumeAvroManager { private final SecretKey secretKey; - private final int delayMillis; - private final int lockTimeoutRetryCount; private final ExecutorService threadPool; @@ -118,8 +116,7 @@ public class FlumePersistentManager extends FlumeAvroManager { final int requestTimeout, final int delay, final Database database, final Environment environment, final SecretKey secretKey, final int lockTimeoutRetryCount) { - super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); - this.delayMillis = delay; + super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); this.database = database; this.environment = environment; dbCount.set(database.count()); @@ -515,14 +512,14 @@ public class FlumePersistentManager extends FlumeAvroManager { @Override public void run() { - LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis); - long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis; + LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); + long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); while (!shutdown) { final long nowMillis = System.currentTimeMillis(); final long dbCount = database.count(); dbCounter.set(dbCount); if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { - nextBatchMillis = nowMillis + manager.delayMillis; + nextBatchMillis = nowMillis + manager.getDelayMillis(); try { boolean errors = false; final DatabaseEntry key = new DatabaseEntry(); @@ -621,7 +618,7 @@ public class FlumePersistentManager extends FlumeAvroManager { } } if (errors) { - Thread.sleep(manager.delayMillis); + Thread.sleep(manager.getDelayMillis()); continue; } } catch (final Exception ex) { @@ -629,7 +626,7 @@ public class FlumePersistentManager extends FlumeAvroManager { } } else { if (nextBatchMillis <= nowMillis) { - nextBatchMillis = nowMillis + manager.delayMillis; + nextBatchMillis = nowMillis + manager.getDelayMillis(); } try { final long interval = nextBatchMillis - nowMillis; http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java index 7c2e9b9..2715abe 100644 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java +++ b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java @@ -217,6 +217,47 @@ public class FlumeAppenderTest { eventSource.stop(); } + + @Test + public void testIncompleteBatch() throws IOException { + final Agent[] agents = new Agent[] { Agent.createAgent("localhost", + testPort) }; + final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, + null, "false", "Avro", null, "1000", "1000", "1", "500", + "avro", "false", null, null, null, null, null, "true", "10", + null, null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + Assert.assertNotNull(avroLogger); + + avroLogger.info("Test message 0"); + + final Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + Assert.assertNull("Received event", event); + + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + } + + avroLogger.info("Test message 1"); + for (int i = 0; i < 2; ++i) { + event = channel.take(); + Assert.assertNotNull("No event for item " + i, event); + Assert.assertTrue("Channel contained event, but not expected message", + getBody(event).endsWith("Test message " + i)); + } + transaction.commit(); + transaction.close(); + + eventSource.stop(); + } + @Test public void testBatch() throws IOException { final Agent[] agents = new Agent[] { Agent.createAgent("localhost", http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/src/changes/changes.xml ---------------------------------------------------------------------- diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 849457b..61cd933 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -24,6 +24,9 @@ </properties> <body> <release version="2.4" date="2015-MM-DD" description="GA Release 2.4"> + <action issue="LOG4J2=1044" dev="rgoers" type="update"> + Support batchSize in FlumeAvroManager. + </action> <action issue="LOG4J2-767" dev="ggregory" type="add" due-to="Mikael Ståldal"> New module for Liquibase integration. </action>
