This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/main by this push:
new 9d7058d3 Improvements on log4j-socket collector, defining the
listening hostname and JEP 290 implementation (#564)
9d7058d3 is described below
commit 9d7058d37160a16aaad34b488170adf277351e8b
Author: JB Onofré <[email protected]>
AuthorDate: Sun Nov 9 08:54:54 2025 +0100
Improvements on log4j-socket collector, defining the listening hostname and
JEP 290 implementation (#564)
* Improvements on log4j-socket collector, defining the listening hostname
and JEP 290 implementation
* Fix test and implement authentication
---
....apache.karaf.decanter.collector.log.socket.cfg | 5 +
.../collector/log/socket/SocketCollector.java | 162 ++++++++++++++++++---
.../collector/log/socket/SocketCollectorTest.java | 136 ++++++++++++++++-
3 files changed, 278 insertions(+), 25 deletions(-)
diff --git
a/collector/log4j-socket/src/main/cfg/org.apache.karaf.decanter.collector.log.socket.cfg
b/collector/log4j-socket/src/main/cfg/org.apache.karaf.decanter.collector.log.socket.cfg
index bc6b31f3..10162f21 100644
---
a/collector/log4j-socket/src/main/cfg/org.apache.karaf.decanter.collector.log.socket.cfg
+++
b/collector/log4j-socket/src/main/cfg/org.apache.karaf.decanter.collector.log.socket.cfg
@@ -23,3 +23,8 @@
#port=4560
#workers=10
+#hostname=localhost
+#backlog=50
+# if username and password are set, authentication is enabled
+#username=admin
+#password=secret
diff --git
a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
index ce79648b..ed811905 100644
---
a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
+++
b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
@@ -18,15 +18,18 @@ package org.apache.karaf.decanter.collector.log.socket;
import java.io.BufferedInputStream;
import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InvalidClassException;
+import java.io.ObjectInputFilter;
import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
+import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Map;
@@ -56,8 +59,12 @@ import org.slf4j.LoggerFactory;
)
public class SocketCollector implements Closeable, Runnable {
+ public static final String HOSTNAME = "hostname";
public static final String PORT_NAME = "port";
+ public static final String BACKLOG = "backlog";
public static final String WORKERS_NAME = "workers";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
@Reference
public EventAdmin dispatcher;
@@ -68,14 +75,16 @@ public class SocketCollector implements Closeable, Runnable
{
private ExecutorService executor;
private Dictionary<String, Object> properties;
- @SuppressWarnings("unchecked")
@Activate
public void activate(ComponentContext context) throws IOException {
this.properties = context.getProperties();
+ String hostname = getProperty(this.properties, HOSTNAME, "localhost");
int port = Integer.parseInt(getProperty(this.properties, PORT_NAME,
"4560"));
+ int backlog = Integer.parseInt(getProperty(this.properties, BACKLOG,
"50"));
int workers = Integer.parseInt(getProperty(this.properties,
WORKERS_NAME, "10"));
- LOGGER.info("Starting Log4j Socket collector on port {}", port);
- this.serverSocket = new ServerSocket(port);
+ LOGGER.info("Starting Log4j Socket collector on {}:{}", hostname,
port);
+ InetAddress host = InetAddress.getByName(hostname);
+ this.serverSocket = new ServerSocket(port, backlog, host);
// adding 1 for serverSocket handling
this.executor = Executors.newFixedThreadPool(workers + 1);
this.executor.execute(this);
@@ -201,16 +210,26 @@ public class SocketCollector implements Closeable,
Runnable {
}
public void run() {
- try (ObjectInputStream ois = new LoggingEventObjectInputStream(new
BufferedInputStream(clientSocket
- .getInputStream()))) {
- while (open) {
- try {
- Object event = ois.readObject();
- if (event instanceof LoggingEvent) {
- handleLog4j((LoggingEvent)event);
+ try {
+ InputStream socketInputStream = new
BufferedInputStream(clientSocket.getInputStream());
+
+ // Perform authentication if configured
+ if (!authenticate(socketInputStream)) {
+ LOGGER.warn("Authentication failed for client at {}",
clientSocket.getInetAddress());
+ return;
+ }
+
+ // After successful authentication, proceed with normal log
event processing
+ try (ObjectInputStream ois = new
LoggingEventObjectInputStream(socketInputStream)) {
+ while (open) {
+ try {
+ Object event = ois.readObject();
+ if (event instanceof LoggingEvent) {
+ handleLog4j((LoggingEvent)event);
+ }
+ } catch (ClassNotFoundException e) {
+ LOGGER.warn("Unable to deserialize event from " +
clientSocket.getInetAddress(), e);
}
- } catch (ClassNotFoundException e) {
- LOGGER.warn("Unable to deserialize event from " +
clientSocket.getInetAddress(), e);
}
}
} catch (EOFException e) {
@@ -224,29 +243,126 @@ public class SocketCollector implements Closeable,
Runnable {
LOGGER.info("Error closing socket", e);
}
}
+
+ /**
+ * Authenticates the client connection.
+ * Authentication protocol:
+ * 1. Client sends username length (int) followed by username (UTF-8
bytes)
+ * 2. Client sends password length (int) followed by password (UTF-8
bytes)
+ * 3. Server validates and sends acknowledgment: 1 (success) or 0
(failure)
+ *
+ * @param inputStream the input stream to read authentication data from
+ * @return true if authentication succeeds or is not required, false
otherwise
+ */
+ private boolean authenticate(InputStream inputStream) throws
IOException {
+ String configuredUsername = getProperty(properties, USERNAME,
null);
+ String configuredPassword = getProperty(properties, PASSWORD,
null);
+
+ // If no authentication is configured, allow connection
+ if (configuredUsername == null && configuredPassword == null) {
+ return true;
+ }
+
+ // If only one is configured, require both
+ if (configuredUsername == null || configuredPassword == null) {
+ LOGGER.warn("Both username and password must be configured for
authentication");
+ return false;
+ }
+
+ DataInputStream dis = new DataInputStream(inputStream);
+ DataOutputStream dos = new
DataOutputStream(clientSocket.getOutputStream());
+
+ try {
+ // Read username
+ int usernameLength = dis.readInt();
+ if (usernameLength < 0 || usernameLength > 1024) {
+ LOGGER.warn("Invalid username length from {}",
clientSocket.getInetAddress());
+ dos.writeByte(0); // Send failure
+ dos.flush();
+ return false;
+ }
+ byte[] usernameBytes = new byte[usernameLength];
+ dis.readFully(usernameBytes);
+ String username = new String(usernameBytes,
StandardCharsets.UTF_8);
+
+ // Read password
+ int passwordLength = dis.readInt();
+ if (passwordLength < 0 || passwordLength > 1024) {
+ LOGGER.warn("Invalid password length from {}",
clientSocket.getInetAddress());
+ dos.writeByte(0); // Send failure
+ dos.flush();
+ return false;
+ }
+ byte[] passwordBytes = new byte[passwordLength];
+ dis.readFully(passwordBytes);
+ String password = new String(passwordBytes,
StandardCharsets.UTF_8);
+
+ // Validate credentials
+ boolean authenticated = configuredUsername.equals(username) &&
configuredPassword.equals(password);
+
+ // Send acknowledgment
+ dos.writeByte(authenticated ? 1 : 0);
+ dos.flush();
+
+ if (authenticated) {
+ LOGGER.debug("Client authenticated successfully: {}",
username);
+ } else {
+ LOGGER.warn("Authentication failed for user '{}' from {}",
username, clientSocket.getInetAddress());
+ }
+
+ return authenticated;
+ } catch (EOFException e) {
+ LOGGER.debug("Client disconnected during authentication");
+ return false;
+ }
+ // Note: We don't close dis/dos here as the underlying streams are
still needed
+ }
}
private static class LoggingEventObjectInputStream extends
ObjectInputStream {
public LoggingEventObjectInputStream(InputStream is) throws
IOException {
super(is);
+ // JEP 290: Set ObjectInputFilter to filter incoming serialization
data
+ setObjectInputFilter(createLoggingEventFilter());
}
- @Override
- protected Class<?> resolveClass(ObjectStreamClass desc) throws
IOException, ClassNotFoundException {
- if (!isAllowedByDefault(desc.getName())) {
- throw new InvalidClassException("Unauthorized deserialization
attempt", desc.getName());
- }
- return super.resolveClass(desc);
+ /**
+ * Creates an ObjectInputFilter for JEP 290 that allows only the
classes
+ * necessary for Log4j LoggingEvent deserialization.
+ *
+ * Note: Based off the internals of LoggingEvent. Will need to be
+ * adjusted for Log4J 2
+ */
+ private static ObjectInputFilter createLoggingEventFilter() {
+ return new ObjectInputFilter() {
+ @Override
+ public Status checkInput(FilterInfo filterInfo) {
+ Class<?> clazz = filterInfo.serialClass();
+ if (clazz != null) {
+ String className = clazz.getName();
+ if (isAllowedByDefault(className)) {
+ return Status.ALLOWED;
+ } else {
+ return Status.REJECTED;
+ }
+ }
+ // Allow array depth and references checks
+ long arrayLength = filterInfo.arrayLength();
+ if (arrayLength >= 0 && arrayLength > Integer.MAX_VALUE) {
+ return Status.REJECTED;
+ }
+ return Status.UNDECIDED;
+ }
+ };
}
- // Note: Based off the internals of LoggingEvent. Will need to be
- // adjusted for Log4J 2
private static boolean isAllowedByDefault(final String name) {
return name.startsWith("java.lang.")
|| name.startsWith("[Ljava.lang.")
|| name.startsWith("org.apache.log4j.")
- || name.equals("java.util.Hashtable");
+ || name.startsWith("java.util.Hashtable")
+ || name.startsWith("[Ljava.util.Map");
}
}
}
diff --git
a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
index 788d2db3..fc6cda26 100644
---
a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
+++
b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
@@ -18,10 +18,12 @@ package org.apache.karaf.decanter.collector.log.socket;
import static org.junit.Assert.assertEquals;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -37,7 +39,6 @@ import org.apache.log4j.spi.ThrowableInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
@@ -90,7 +91,6 @@ public class SocketCollectorTest {
}
@Test
- @Ignore("Works fine with JDK11 but not with JDK8 after
maven-surefire-plugin 2.22.2 update")
public void testUnknownEvent() throws Exception {
activate();
sendEventOnSocket(new UnknownClass());
@@ -98,6 +98,35 @@ public class SocketCollectorTest {
assertEquals("Event(s) should have been correctly handled", 0,
eventAdmin.getPostEvents().size());
}
+ @Test
+ public void testDeepObject() throws Exception {
+ activate();
+ sendEventOnSocket(getMaliciousSerializableDictionaryDemo());
+ waitUntilEventCountHandled(1);
+ assertEquals(0, eventAdmin.getPostEvents().size());
+ }
+
+ public static Object getMaliciousSerializableDictionaryDemo() {
+ Dictionary hashtable = new Hashtable();
+ Dictionary s1 = hashtable;
+ Dictionary s2 = new
+ Hashtable();
+ for (int i = 0; i < 100; i++) {
+ Dictionary t1 = new Hashtable();
+ Dictionary t2 = new Hashtable();
+
t1.put("afdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdf",
+
"afdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdfafdsgasdgfasdgasdf");
+ t2.put("test",
"test112312test1123123test1123123test1123123test1123123test1123123test11231233");
+ s1.put(t1, t2);
+ s1.put(t2, t1);
+ s2.put(t2, t1);
+ s2.put(t1, t2);
+ s1 = t1;
+ s2 = t2;
+ }
+ return (Object) hashtable;
+ }
+
private static final class UnknownClass implements java.io.Serializable {
String someValue = "12345";
@@ -136,6 +165,80 @@ public class SocketCollectorTest {
assertEquals("Event(s) should have been correctly handled", 2,
eventAdmin.getPostEvents().size());
}
+ /**
+ * Test authentication with correct credentials
+ */
+ @Test
+ public void testAuthenticationSuccess() throws Exception {
+ componentContext.getProperties().put(SocketCollector.USERNAME,
"testuser");
+ componentContext.getProperties().put(SocketCollector.PASSWORD,
"testpass");
+ activate();
+
+ sendAuthenticatedEventOnSocket(newLoggingEvent("Authenticated
message"), "testuser", "testpass");
+ waitUntilEventCountHandled(1);
+ assertEquals("Event should have been handled after successful
authentication", 1, eventAdmin.getPostEvents().size());
+ }
+
+ /**
+ * Test authentication with incorrect credentials
+ */
+ @Test
+ public void testAuthenticationFailure() throws Exception {
+ componentContext.getProperties().put(SocketCollector.USERNAME,
"testuser");
+ componentContext.getProperties().put(SocketCollector.PASSWORD,
"testpass");
+ activate();
+
+ try {
+ sendAuthenticatedEventOnSocket(newLoggingEvent("Should not be
processed"), "testuser", "wrongpass");
+ // If we get here, authentication didn't fail as expected
+ Assert.fail("Authentication should have failed with wrong
password");
+ } catch (IOException e) {
+ // Expected - authentication failed
+ Assert.assertTrue("Exception should indicate authentication
failure",
+ e.getMessage() != null &&
e.getMessage().contains("Authentication failed"));
+ }
+
+ // Wait a bit to ensure no events were processed
+ Thread.sleep(100);
+ assertEquals("No events should have been processed after
authentication failure", 0, eventAdmin.getPostEvents().size());
+ }
+
+ /**
+ * Test that authentication is optional (backward compatibility)
+ */
+ @Test
+ public void testNoAuthentication() throws Exception {
+ // Don't set username/password
+ activate();
+
+ // Should work without authentication
+ sendEventOnSocket(newLoggingEvent("No auth message"));
+ waitUntilEventCountHandled(1);
+ assertEquals("Event should have been handled without authentication",
1, eventAdmin.getPostEvents().size());
+ }
+
+ /**
+ * Test authentication with wrong username
+ */
+ @Test
+ public void testAuthenticationWrongUsername() throws Exception {
+ componentContext.getProperties().put(SocketCollector.USERNAME,
"testuser");
+ componentContext.getProperties().put(SocketCollector.PASSWORD,
"testpass");
+ activate();
+
+ try {
+ sendAuthenticatedEventOnSocket(newLoggingEvent("Should not be
processed"), "wronguser", "testpass");
+ Assert.fail("Authentication should have failed with wrong
username");
+ } catch (IOException e) {
+ // Expected - authentication failed
+ Assert.assertTrue("Exception should indicate authentication
failure",
+ e.getMessage() != null &&
e.getMessage().contains("Authentication failed"));
+ }
+
+ Thread.sleep(100);
+ assertEquals("No events should have been processed after
authentication failure", 0, eventAdmin.getPostEvents().size());
+ }
+
private void waitUntilEventCountHandled(int eventCount) throws
InterruptedException {
long timeout = 20000L;
long start = System.currentTimeMillis();
@@ -173,6 +276,35 @@ public class SocketCollectorTest {
}
}
+ private void sendAuthenticatedEventOnSocket(Object event, String username,
String password) throws IOException {
+ try (Socket socket = new Socket()) {
+ socket.connect(new InetSocketAddress("localhost", port), 5000);
+ DataOutputStream dos = new
DataOutputStream(socket.getOutputStream());
+
+ // Send authentication
+ byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+ dos.writeInt(usernameBytes.length);
+ dos.write(usernameBytes);
+
+ byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+ dos.writeInt(passwordBytes.length);
+ dos.write(passwordBytes);
+ dos.flush();
+
+ // Read authentication response
+ int response = socket.getInputStream().read();
+ if (response != 1) {
+ throw new IOException("Authentication failed, server returned:
" + response);
+ }
+
+ // Send event using ObjectOutputStream (it will write its header)
+ try (ObjectOutputStream out = new
ObjectOutputStream(socket.getOutputStream())) {
+ out.writeObject(event);
+ out.flush();
+ }
+ }
+ }
+
/**
* Stub used only for this unit test
*/