Repository: logging-log4j2
Updated Branches:
  refs/heads/master c634c5ff3 -> bfd73824a


LOG4J2-1044 - Write pending events to Flume when the appender is stopped.


Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/bfd73824
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/bfd73824
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/bfd73824

Branch: refs/heads/master
Commit: bfd73824a6bf5dcd2a18ac2b9b75a4c20a6ee2eb
Parents: c634c5f
Author: Ralph Goers <[email protected]>
Authored: Sat Aug 8 17:00:53 2015 -0700
Committer: Ralph Goers <[email protected]>
Committed: Sat Aug 8 17:00:53 2015 -0700

----------------------------------------------------------------------
 .../log4j/flume/appender/FlumeAvroManager.java  |  9 +++++
 .../log4j/flume/appender/FlumeAppenderTest.java | 42 ++++++++++++++++++--
 src/changes/changes.xml                         |  3 ++
 3 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/bfd73824/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git 
a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
 
b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index dfd59bd..22ffddc 100644
--- 
a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ 
b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -258,6 +258,15 @@ public class FlumeAvroManager extends AbstractFlumeManager 
{
     protected void releaseSub() {
         if (rpcClient != null) {
             try {
+                synchronized(this) {
+                    try {
+                        if (batchSize > 1 && batchEvent.getEvents().size() > 
0) {
+                            send(batchEvent);
+                        }
+                    } catch (final Exception ex) {
+                        LOGGER.error("Error sending final batch: {}", 
ex.getMessage());
+                    }
+                } 
                 rpcClient.close();
             } catch (final Exception ex) {
                 LOGGER.error("Attempt to close RPC client failed", ex);

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/bfd73824/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
----------------------------------------------------------------------
diff --git 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
index 8589e6c..6bfd168 100644
--- 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
+++ 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
@@ -173,8 +173,7 @@ public class FlumeAppenderTest {
 
         final Event event = channel.take();
         Assert.assertNotNull(event);
-        Assert.assertTrue("Channel contained event, but not expected message",
-                getBody(event).endsWith("Success"));
+        Assert.assertTrue("Channel contained event, but not expected message", 
getBody(event).endsWith("Success"));
         transaction.commit();
         transaction.close();
 
@@ -259,6 +258,40 @@ public class FlumeAppenderTest {
     }
 
     @Test
+    public void testIncompleteBatch2() throws IOException {
+        final Agent[] agents = new Agent[] { Agent.createAgent("localhost",
+                testPort) };
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
+                null, "false", "Avro", null, "1000", "1000", "1", "500",
+                "avro", "false", null, null, null, null, null, "true", "10",
+                null, null, null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        Assert.assertNotNull(avroLogger);
+
+        avroLogger.info("Test message 0");
+
+        final Transaction transaction = channel.getTransaction();
+        transaction.begin();
+
+        avroLogger.info("Test message 1");
+        avroLogger.info("Test message 2");
+        avroAppender.stop();
+        for (int i = 0; i < 3; ++i) {
+            Event event = channel.take();
+            Assert.assertNotNull("No event for item " + i, event);
+            Assert.assertTrue("Channel contained event, but not expected 
message. Received : " + getBody(event),
+                    getBody(event).endsWith("Test message " + i));
+        }
+        transaction.commit();
+        transaction.close();
+
+        eventSource.stop();
+    }
+
+    @Test
     public void testBatch() throws IOException {
         final Agent[] agents = new Agent[] { Agent.createAgent("localhost",
                 testPort) };
@@ -389,7 +422,7 @@ public class FlumeAppenderTest {
 
         Event event = channel.take();
         Assert.assertNotNull(event);
-        Assert.assertTrue("Channel contained event, but not expected message",
+        Assert.assertTrue("Channel contained event, but not expected message. 
Received : " + getBody(event),
                 getBody(event).endsWith("Test message"));
         transaction.commit();
         transaction.close();
@@ -431,6 +464,9 @@ public class FlumeAppenderTest {
     }
 
     private String getBody(final Event event) throws IOException {
+        if (event == null) {
+            return "";
+        }
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         final InputStream is = new GZIPInputStream(new ByteArrayInputStream(
                 event.getBody()));

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/bfd73824/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 22883a7..c381735 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -24,6 +24,9 @@
   </properties>
   <body>
     <release version="2.4" date="2015-MM-DD" description="GA Release 2.4">
+      <action issue="LOG4J2-1044" dev="rgoers" type="fix">
+        Write pending events to Flume when the appender is stopped.
+      </action>
       <action issue="LOG4J2-1017" dev="ggregory" type="update">
         Update Java platform from Java 6 to 7. From this version onwards, 
log4j 2 requires Java 7.
       </action>

Reply via email to