This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
The following commit(s) were added to refs/heads/master by this push:
new a29779b556 LOG4J2-3536 - Upgrade Flume Appender to Flume 1.10.0
a29779b556 is described below
commit a29779b55614c79e9b70644125333cf124fc6b6b
Author: Ralph Goers <[email protected]>
AuthorDate: Tue Jul 5 00:47:45 2022 -0700
LOG4J2-3536 - Upgrade Flume Appender to Flume 1.10.0
---
log4j-flume-ng/pom.xml | 13 ++++----
.../flume/appender/FlumeEmbeddedAgentTest.java | 34 +++++++++++++--------
.../flume/appender/FlumeEmbeddedAppenderTest.java | 35 ++++++++++++++--------
.../appender/FlumePersistentAppenderTest.java | 33 +++++++++++++-------
.../log4j/flume/appender/FlumePersistentPerf.java | 34 +++++++++++++--------
pom.xml | 9 +++++-
6 files changed, 101 insertions(+), 57 deletions(-)
diff --git a/log4j-flume-ng/pom.xml b/log4j-flume-ng/pom.xml
index 6bc60c9da2..bf29ed14c0 100644
--- a/log4j-flume-ng/pom.xml
+++ b/log4j-flume-ng/pom.xml
@@ -83,14 +83,6 @@
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
</dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
@@ -111,6 +103,11 @@
<artifactId>hadoop-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
index df3c8eb3de..d9cc295264 100644
---
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
+++
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
@@ -34,9 +34,8 @@ import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.netty.NettyServer;
+import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -60,6 +59,8 @@ import org.junit.Test;
import com.google.common.base.Preconditions;
+import static org.junit.Assert.fail;
+
/**
*
*/
@@ -226,17 +227,27 @@ public class FlumeEmbeddedAgentTest {
private static class EventCollector implements AvroSourceProtocol {
private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new
LinkedBlockingQueue<>();
- private final NettyServer nettyServer;
-
+ private Server server;
public EventCollector(final int port) {
- final Responder responder = new
SpecificResponder(AvroSourceProtocol.class, this);
- nettyServer = new NettyServer(responder, new
InetSocketAddress(HOSTNAME, port));
- nettyServer.start();
+ try {
+ server = createServer(this, port);
+ } catch (InterruptedException ex) {
+ fail("Server creation was interrrupted");
+ }
+ server.start();
+ }
+
+ private Server createServer(AvroSourceProtocol protocol, final int
port) throws InterruptedException {
+
+ server = new NettyServer(new
SpecificResponder(AvroSourceProtocol.class, protocol),
+ new InetSocketAddress(HOSTNAME, port));
+
+ return server;
}
public void stop() {
- nettyServer.close();
+ server.close();
}
public Event poll() {
@@ -255,14 +266,13 @@ public class FlumeEmbeddedAgentTest {
}
@Override
- public Status append(final AvroFlumeEvent event) throws
AvroRemoteException {
+ public Status append(final AvroFlumeEvent event) {
eventQueue.add(event);
return Status.OK;
}
@Override
- public Status appendBatch(final List<AvroFlumeEvent> events)
- throws AvroRemoteException {
+ public Status appendBatch(final List<AvroFlumeEvent> events) {
Preconditions.checkState(eventQueue.addAll(events));
return Status.OK;
}
diff --git
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
index 5a01f0fbd3..f4afe1bedd 100644
---
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
+++
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
@@ -34,9 +34,8 @@ import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -61,6 +60,8 @@ import org.junit.Test;
import com.google.common.base.Preconditions;
+import static org.junit.Assert.fail;
+
/**
*
*/
@@ -256,18 +257,27 @@ public class FlumeEmbeddedAppenderTest {
private static class EventCollector implements AvroSourceProtocol {
private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new
LinkedBlockingQueue<>();
- private final NettyServer nettyServer;
-
+ private Server server;
public EventCollector(final int port) {
- final Responder responder = new
SpecificResponder(AvroSourceProtocol.class, this);
- System.out.println("Collector listening on port " + port);
- nettyServer = new NettyServer(responder, new
InetSocketAddress(HOSTNAME, port));
- nettyServer.start();
+ try {
+ server = createServer(this, port);
+ } catch (InterruptedException ex) {
+ fail("Server creation was interrrupted");
+ }
+ server.start();
+ }
+
+ private Server createServer(AvroSourceProtocol protocol, final int
port) throws InterruptedException {
+
+ server = new NettyServer(new
SpecificResponder(AvroSourceProtocol.class, protocol),
+ new InetSocketAddress(HOSTNAME, port));
+
+ return server;
}
public void stop() {
- nettyServer.close();
+ server.close();
}
public Event poll() {
@@ -286,14 +296,13 @@ public class FlumeEmbeddedAppenderTest {
}
@Override
- public Status append(final AvroFlumeEvent event) throws
AvroRemoteException {
+ public Status append(final AvroFlumeEvent event) {
eventQueue.add(event);
return Status.OK;
}
@Override
- public Status appendBatch(final List<AvroFlumeEvent> events)
- throws AvroRemoteException {
+ public Status appendBatch(final List<AvroFlumeEvent> events) {
Preconditions.checkState(eventQueue.addAll(events));
return Status.OK;
}
diff --git
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
index cd856f28fb..8a925d407a 100644
---
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
+++
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
@@ -36,9 +36,8 @@ import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -64,6 +63,8 @@ import org.junit.Test;
import com.google.common.base.Preconditions;
+import static org.junit.Assert.fail;
+
/**
*
*/
@@ -406,17 +407,27 @@ public class FlumePersistentAppenderTest {
private static class EventCollector implements AvroSourceProtocol {
private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new
LinkedBlockingQueue<>();
- private final NettyServer nettyServer;
-
+ private Server server;
public EventCollector(final int port) {
- final Responder responder = new
SpecificResponder(AvroSourceProtocol.class, this);
- nettyServer = new NettyServer(responder, new
InetSocketAddress(HOSTNAME, port));
- nettyServer.start();
+ try {
+ server = createServer(this, port);
+ } catch (InterruptedException ex) {
+ fail("Server creation was interrrupted");
+ }
+ server.start();
+ }
+
+ private Server createServer(AvroSourceProtocol protocol, final int
port) throws InterruptedException {
+
+ server = new NettyServer(new
SpecificResponder(AvroSourceProtocol.class, protocol),
+ new InetSocketAddress(HOSTNAME, port));
+
+ return server;
}
public void stop() {
- nettyServer.close();
+ server.close();
}
public Event poll() {
@@ -435,14 +446,14 @@ public class FlumePersistentAppenderTest {
}
@Override
- public Status append(final AvroFlumeEvent event) throws
AvroRemoteException {
+ public Status append(final AvroFlumeEvent event) {
eventQueue.add(event);
//System.out.println("Received event " +
event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
return Status.OK;
}
@Override
- public Status appendBatch(final List<AvroFlumeEvent> events) throws
AvroRemoteException {
+ public Status appendBatch(final List<AvroFlumeEvent> events) {
Preconditions.checkState(eventQueue.addAll(events));
for (final AvroFlumeEvent event : events) {
// System.out.println("Received event " +
event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
diff --git
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
index 9dd0eae2ac..e2b19aa9b6 100644
---
a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
+++
b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
@@ -34,9 +34,8 @@ import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -57,6 +56,8 @@ import org.junit.Test;
import com.google.common.base.Preconditions;
+import static org.junit.Assert.fail;
+
/**
*
*/
@@ -167,17 +168,27 @@ public class FlumePersistentPerf {
private static class EventCollector implements AvroSourceProtocol {
private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new
LinkedBlockingQueue<>();
- private final NettyServer nettyServer;
-
+ private Server server;
public EventCollector(final int port) {
- final Responder responder = new
SpecificResponder(AvroSourceProtocol.class, this);
- nettyServer = new NettyServer(responder, new
InetSocketAddress(HOSTNAME, port));
- nettyServer.start();
+ try {
+ server = createServer(this, port);
+ } catch (InterruptedException ex) {
+ fail("Server creation was interrrupted");
+ }
+ server.start();
+ }
+
+ private Server createServer(AvroSourceProtocol protocol, final int
port) throws InterruptedException {
+
+ server = new NettyServer(new
SpecificResponder(AvroSourceProtocol.class, protocol),
+ new InetSocketAddress(HOSTNAME, port));
+
+ return server;
}
public void stop() {
- nettyServer.close();
+ server.close();
}
public Event poll() {
@@ -196,14 +207,13 @@ public class FlumePersistentPerf {
}
@Override
- public Status append(final AvroFlumeEvent event) throws
AvroRemoteException {
+ public Status append(final AvroFlumeEvent event) {
eventQueue.add(event);
return Status.OK;
}
@Override
- public Status appendBatch(final List<AvroFlumeEvent> events)
- throws AvroRemoteException {
+ public Status appendBatch(final List<AvroFlumeEvent> events) {
Preconditions.checkState(eventQueue.addAll(events));
return Status.OK;
}
diff --git a/pom.xml b/pom.xml
index cd1ae3f503..62d3d54e9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,7 +228,7 @@
<spring-boot.version>2.6.7</spring-boot.version>
<springVersion>5.3.19</springVersion>
<kubernetes-client.version>4.6.1</kubernetes-client.version>
- <flumeVersion>1.9.0</flumeVersion>
+ <flumeVersion>1.10.0</flumeVersion>
<disruptorVersion>3.4.4</disruptorVersion>
<conversantDisruptorVersion>1.2.19</conversantDisruptorVersion>
<mongodb3.version>3.12.4</mongodb3.version>
@@ -290,6 +290,7 @@
<argLine>-Xms256m -Xmx1024m</argLine>
<javaTargetVersion>11</javaTargetVersion>
<module.name />
+ <netty-all.version>4.1.72.Final</netty-all.version>
</properties>
<pluginRepositories>
<pluginRepository>
@@ -518,6 +519,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty-all.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>