Repository: logging-log4j2 Updated Branches: refs/heads/master c634c5ff3 -> bfd73824a
LOG4J2-1044 - Write pending events to Flume when the appender is stopped. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/bfd73824 Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/bfd73824 Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/bfd73824 Branch: refs/heads/master Commit: bfd73824a6bf5dcd2a18ac2b9b75a4c20a6ee2eb Parents: c634c5f Author: Ralph Goers <[email protected]> Authored: Sat Aug 8 17:00:53 2015 -0700 Committer: Ralph Goers <[email protected]> Committed: Sat Aug 8 17:00:53 2015 -0700 ---------------------------------------------------------------------- .../log4j/flume/appender/FlumeAvroManager.java | 9 +++++ .../log4j/flume/appender/FlumeAppenderTest.java | 42 ++++++++++++++++++-- src/changes/changes.xml | 3 ++ 3 files changed, 51 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/bfd73824/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java index dfd59bd..22ffddc 100644 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java +++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java @@ -258,6 +258,15 @@ public class FlumeAvroManager extends AbstractFlumeManager { protected void releaseSub() { if (rpcClient != null) { try { + synchronized(this) { + try { + if (batchSize > 1 && batchEvent.getEvents().size() > 0) { + send(batchEvent); + } + } catch (final Exception ex) { + LOGGER.error("Error sending final batch: {}", ex.getMessage()); + } + } rpcClient.close(); } catch (final Exception ex) { LOGGER.error("Attempt to close RPC client failed", ex); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/bfd73824/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java index 8589e6c..6bfd168 100644 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java +++ b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java @@ -173,8 +173,7 @@ public class FlumeAppenderTest { final Event event = channel.take(); Assert.assertNotNull(event); - Assert.assertTrue("Channel contained event, but not expected message", - getBody(event).endsWith("Success")); + Assert.assertTrue("Channel contained event, but not expected message", getBody(event).endsWith("Success")); transaction.commit(); transaction.close(); @@ -259,6 +258,40 @@ public class FlumeAppenderTest { } @Test + public void testIncompleteBatch2() throws IOException { + final Agent[] agents = new Agent[] { Agent.createAgent("localhost", + testPort) }; + final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, + null, "false", "Avro", null, "1000", "1000", "1", "500", + "avro", "false", null, null, null, null, null, "true", "10", + null, null, null, null); + avroAppender.start(); + avroLogger.addAppender(avroAppender); + avroLogger.setLevel(Level.ALL); + + Assert.assertNotNull(avroLogger); + + avroLogger.info("Test message 0"); + + final Transaction transaction = channel.getTransaction(); + transaction.begin(); + + avroLogger.info("Test message 1"); + avroLogger.info("Test message 2"); + avroAppender.stop(); + for (int i = 0; i < 3; ++i) { + Event event = channel.take(); + Assert.assertNotNull("No event for item " + i, event); + Assert.assertTrue("Channel contained event, but not expected message. Received : " + getBody(event), + getBody(event).endsWith("Test message " + i)); + } + transaction.commit(); + transaction.close(); + + eventSource.stop(); + } + + @Test public void testBatch() throws IOException { final Agent[] agents = new Agent[] { Agent.createAgent("localhost", testPort) }; @@ -389,7 +422,7 @@ public class FlumeAppenderTest { Event event = channel.take(); Assert.assertNotNull(event); - Assert.assertTrue("Channel contained event, but not expected message", + Assert.assertTrue("Channel contained event, but not expected message. Received : " + getBody(event), getBody(event).endsWith("Test message")); transaction.commit(); transaction.close(); @@ -431,6 +464,9 @@ public class FlumeAppenderTest { } private String getBody(final Event event) throws IOException { + if (event == null) { + return ""; + } final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final InputStream is = new GZIPInputStream(new ByteArrayInputStream( event.getBody())); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/bfd73824/src/changes/changes.xml ---------------------------------------------------------------------- diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 22883a7..c381735 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -24,6 +24,9 @@ </properties> <body> <release version="2.4" date="2015-MM-DD" description="GA Release 2.4"> + <action issue="LOG4J2-1044" dev="rgoers" type="fix"> + Write pending events to Flume when the appender is stopped. + </action> <action issue="LOG4J2-1017" dev="ggregory" type="update"> Update Java platform from Java 6 to 7. From this version onwards, log4j 2 requires Java 7. </action>
