This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git


The following commit(s) were added to refs/heads/master by this push:
     new a29779b556 LOG4J2-3536 - Upgrade Flume Appender to Flume 1.10.0
a29779b556 is described below

commit a29779b55614c79e9b70644125333cf124fc6b6b
Author: Ralph Goers <[email protected]>
AuthorDate: Tue Jul 5 00:47:45 2022 -0700

    LOG4J2-3536 - Upgrade Flume Appender to Flume 1.10.0
---
 log4j-flume-ng/pom.xml                             | 13 ++++----
 .../flume/appender/FlumeEmbeddedAgentTest.java     | 34 +++++++++++++--------
 .../flume/appender/FlumeEmbeddedAppenderTest.java  | 35 ++++++++++++++--------
 .../appender/FlumePersistentAppenderTest.java      | 33 +++++++++++++-------
 .../log4j/flume/appender/FlumePersistentPerf.java  | 34 +++++++++++++--------
 pom.xml                                            |  9 +++++-
 6 files changed, 101 insertions(+), 57 deletions(-)

diff --git a/log4j-flume-ng/pom.xml b/log4j-flume-ng/pom.xml
index 6bc60c9da2..bf29ed14c0 100644
--- a/log4j-flume-ng/pom.xml
+++ b/log4j-flume-ng/pom.xml
@@ -83,14 +83,6 @@
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-sdk</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-core-asl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
@@ -111,6 +103,11 @@
       <artifactId>hadoop-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
index df3c8eb3de..d9cc295264 100644
--- 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
+++ 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
@@ -34,9 +34,8 @@ import java.util.zip.GZIPInputStream;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.netty.NettyServer;
+import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -60,6 +59,8 @@ import org.junit.Test;
 
 import com.google.common.base.Preconditions;
 
+import static org.junit.Assert.fail;
+
 /**
  *
  */
@@ -226,17 +227,27 @@ public class FlumeEmbeddedAgentTest {
     private static class EventCollector implements AvroSourceProtocol {
         private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new 
LinkedBlockingQueue<>();
 
-        private final NettyServer nettyServer;
-
+        private Server server;
 
         public EventCollector(final int port) {
-            final Responder responder = new 
SpecificResponder(AvroSourceProtocol.class, this);
-            nettyServer = new NettyServer(responder, new 
InetSocketAddress(HOSTNAME, port));
-            nettyServer.start();
+            try {
+                server = createServer(this, port);
+            } catch (InterruptedException ex) {
+                fail("Server creation was interrrupted");
+            }
+            server.start();
+        }
+
+        private Server createServer(AvroSourceProtocol protocol, final int 
port) throws InterruptedException {
+
+            server = new NettyServer(new 
SpecificResponder(AvroSourceProtocol.class, protocol),
+                    new InetSocketAddress(HOSTNAME, port));
+
+            return server;
         }
 
         public void stop() {
-            nettyServer.close();
+            server.close();
         }
 
         public Event poll() {
@@ -255,14 +266,13 @@ public class FlumeEmbeddedAgentTest {
         }
 
         @Override
-        public Status append(final AvroFlumeEvent event) throws 
AvroRemoteException {
+        public Status append(final AvroFlumeEvent event) {
             eventQueue.add(event);
             return Status.OK;
         }
 
         @Override
-        public Status appendBatch(final List<AvroFlumeEvent> events)
-            throws AvroRemoteException {
+        public Status appendBatch(final List<AvroFlumeEvent> events) {
             Preconditions.checkState(eventQueue.addAll(events));
             return Status.OK;
         }
diff --git 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
index 5a01f0fbd3..f4afe1bedd 100644
--- 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
+++ 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
@@ -34,9 +34,8 @@ import java.util.zip.GZIPInputStream;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -61,6 +60,8 @@ import org.junit.Test;
 
 import com.google.common.base.Preconditions;
 
+import static org.junit.Assert.fail;
+
 /**
  *
  */
@@ -256,18 +257,27 @@ public class FlumeEmbeddedAppenderTest {
     private static class EventCollector implements AvroSourceProtocol {
         private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new 
LinkedBlockingQueue<>();
 
-        private final NettyServer nettyServer;
-
+        private Server server;
 
         public EventCollector(final int port) {
-            final Responder responder = new 
SpecificResponder(AvroSourceProtocol.class, this);
-            System.out.println("Collector listening on port " + port);
-            nettyServer = new NettyServer(responder, new 
InetSocketAddress(HOSTNAME, port));
-            nettyServer.start();
+            try {
+                server = createServer(this, port);
+            } catch (InterruptedException ex) {
+                fail("Server creation was interrrupted");
+            }
+            server.start();
+        }
+
+        private Server createServer(AvroSourceProtocol protocol, final int 
port) throws InterruptedException {
+
+            server = new NettyServer(new 
SpecificResponder(AvroSourceProtocol.class, protocol),
+                    new InetSocketAddress(HOSTNAME, port));
+
+            return server;
         }
 
         public void stop() {
-            nettyServer.close();
+            server.close();
         }
 
         public Event poll() {
@@ -286,14 +296,13 @@ public class FlumeEmbeddedAppenderTest {
         }
 
         @Override
-        public Status append(final AvroFlumeEvent event) throws 
AvroRemoteException {
+        public Status append(final AvroFlumeEvent event) {
             eventQueue.add(event);
             return Status.OK;
         }
 
         @Override
-        public Status appendBatch(final List<AvroFlumeEvent> events)
-            throws AvroRemoteException {
+        public Status appendBatch(final List<AvroFlumeEvent> events) {
             Preconditions.checkState(eventQueue.addAll(events));
             return Status.OK;
         }
diff --git 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
index cd856f28fb..8a925d407a 100644
--- 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
+++ 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
@@ -36,9 +36,8 @@ import java.util.zip.GZIPInputStream;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -64,6 +63,8 @@ import org.junit.Test;
 
 import com.google.common.base.Preconditions;
 
+import static org.junit.Assert.fail;
+
 /**
  *
  */
@@ -406,17 +407,27 @@ public class FlumePersistentAppenderTest {
     private static class EventCollector implements AvroSourceProtocol {
         private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new 
LinkedBlockingQueue<>();
 
-        private final NettyServer nettyServer;
-
+        private Server server;
 
         public EventCollector(final int port) {
-            final Responder responder = new 
SpecificResponder(AvroSourceProtocol.class, this);
-            nettyServer = new NettyServer(responder, new 
InetSocketAddress(HOSTNAME, port));
-            nettyServer.start();
+            try {
+                server = createServer(this, port);
+            } catch (InterruptedException ex) {
+                fail("Server creation was interrrupted");
+            }
+            server.start();
+        }
+
+        private Server createServer(AvroSourceProtocol protocol, final int 
port) throws InterruptedException {
+
+            server = new NettyServer(new 
SpecificResponder(AvroSourceProtocol.class, protocol),
+                    new InetSocketAddress(HOSTNAME, port));
+
+            return server;
         }
 
         public void stop() {
-            nettyServer.close();
+            server.close();
         }
 
         public Event poll() {
@@ -435,14 +446,14 @@ public class FlumePersistentAppenderTest {
         }
 
         @Override
-        public Status append(final AvroFlumeEvent event) throws 
AvroRemoteException {
+        public Status append(final AvroFlumeEvent event) {
             eventQueue.add(event);
             //System.out.println("Received event " + 
event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
             return Status.OK;
         }
 
         @Override
-        public Status appendBatch(final List<AvroFlumeEvent> events) throws 
AvroRemoteException {
+        public Status appendBatch(final List<AvroFlumeEvent> events) {
             Preconditions.checkState(eventQueue.addAll(events));
             for (final AvroFlumeEvent event : events) {
                 // System.out.println("Received event " + 
event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
diff --git 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
index 9dd0eae2ac..e2b19aa9b6 100644
--- 
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
+++ 
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
@@ -34,9 +34,8 @@ import java.util.zip.GZIPInputStream;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -57,6 +56,8 @@ import org.junit.Test;
 
 import com.google.common.base.Preconditions;
 
+import static org.junit.Assert.fail;
+
 /**
  *
  */
@@ -167,17 +168,27 @@ public class FlumePersistentPerf {
     private static class EventCollector implements AvroSourceProtocol {
         private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new 
LinkedBlockingQueue<>();
 
-        private final NettyServer nettyServer;
-
+        private Server server;
 
         public EventCollector(final int port) {
-            final Responder responder = new 
SpecificResponder(AvroSourceProtocol.class, this);
-            nettyServer = new NettyServer(responder, new 
InetSocketAddress(HOSTNAME, port));
-            nettyServer.start();
+            try {
+                server = createServer(this, port);
+            } catch (InterruptedException ex) {
+                fail("Server creation was interrrupted");
+            }
+            server.start();
+        }
+
+        private Server createServer(AvroSourceProtocol protocol, final int 
port) throws InterruptedException {
+
+            server = new NettyServer(new 
SpecificResponder(AvroSourceProtocol.class, protocol),
+                    new InetSocketAddress(HOSTNAME, port));
+
+            return server;
         }
 
         public void stop() {
-            nettyServer.close();
+            server.close();
         }
 
         public Event poll() {
@@ -196,14 +207,13 @@ public class FlumePersistentPerf {
         }
 
         @Override
-        public Status append(final AvroFlumeEvent event) throws 
AvroRemoteException {
+        public Status append(final AvroFlumeEvent event) {
             eventQueue.add(event);
             return Status.OK;
         }
 
         @Override
-        public Status appendBatch(final List<AvroFlumeEvent> events)
-            throws AvroRemoteException {
+        public Status appendBatch(final List<AvroFlumeEvent> events) {
             Preconditions.checkState(eventQueue.addAll(events));
             return Status.OK;
         }
diff --git a/pom.xml b/pom.xml
index cd1ae3f503..62d3d54e9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,7 +228,7 @@
     <spring-boot.version>2.6.7</spring-boot.version>
     <springVersion>5.3.19</springVersion>
     <kubernetes-client.version>4.6.1</kubernetes-client.version>
-    <flumeVersion>1.9.0</flumeVersion>
+    <flumeVersion>1.10.0</flumeVersion>
     <disruptorVersion>3.4.4</disruptorVersion>
     <conversantDisruptorVersion>1.2.19</conversantDisruptorVersion>
     <mongodb3.version>3.12.4</mongodb3.version>
@@ -290,6 +290,7 @@
     <argLine>-Xms256m -Xmx1024m</argLine>
     <javaTargetVersion>11</javaTargetVersion>
     <module.name />
+    <netty-all.version>4.1.72.Final</netty-all.version>
   </properties>
   <pluginRepositories>
     <pluginRepository>
@@ -518,6 +519,12 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-all</artifactId>
+        <version>${netty-all.version}</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-core</artifactId>

Reply via email to