This is an automated email from the ASF dual-hosted git repository. vy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
commit 08b2946d3619f70887b9e528a0d09940a4c7294e Author: Volkan Yazıcı <vol...@yazi.ci> AuthorDate: Sun Oct 24 20:40:38 2021 +0200 LOG4J2-2829 SocketAppender should propagate failures when reconnection fails. (#591) Certain `SocketAppender` tests are rewritten and the redundant/disabled ones are removed. --- log4j-core/pom.xml | 5 + .../logging/log4j/core/net/TcpSocketManager.java | 5 +- .../core/net/SocketAppenderReconnectTest.java | 401 +++++++++++++++++++++ .../log4j/core/net/SocketMessageLossTest.java | 144 -------- .../log4j/core/net/SocketReconnectTest.java | 323 ----------------- .../apache/logging/log4j/core/net/SocketTest.java | 82 ----- src/changes/changes.xml | 3 + 7 files changed, 413 insertions(+), 550 deletions(-) diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml index 0d5c9c9..43286d4 100644 --- a/log4j-core/pom.xml +++ b/log4j-core/pom.xml @@ -287,6 +287,11 @@ <artifactId>HdrHistogram</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> <!-- For testing log4j 2 2.x plugins --> <dependency> <groupId>com.vlkan.log4j2</groupId> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java index fad8870..d80375f 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java @@ -157,8 +157,8 @@ public class TcpSocketManager extends AbstractSocketManager { try { writeAndFlush(bytes, offset, length, immediateFlush); } catch (final IOException causeEx) { + final String config = inetAddress + ":" + port; if (retry && reconnector == null) { - final String config = inetAddress + ":" + port; reconnector = createReconnector(); try { reconnector.reconnect(); @@ -177,7 +177,10 @@ public class TcpSocketManager extends AbstractSocketManager { config), causeEx); } + return; } + final String message = String.format("Error writing to %s for connection %s", getName(), config); + throw new AppenderLoggingException(message, causeEx); } } } diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketAppenderReconnectTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketAppenderReconnectTest.java new file mode 100644 index 0000000..e007822 --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketAppenderReconnectTest.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.net; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.AppenderLoggingException; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory; +import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; +import org.apache.logging.log4j.core.net.TcpSocketManager.HostResolver; +import org.apache.logging.log4j.status.StatusLogger; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests reconnection support of {@link org.apache.logging.log4j.core.appender.SocketAppender}. + */ +class SocketAppenderReconnectTest { + + private static final Logger LOGGER = StatusLogger.getLogger(); + + /** + * Tests if failures are propagated when reconnection fails. + * + * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-2829">LOG4J2-2829</a> + */ + @Test + void repeating_reconnect_failures_should_be_propagated() throws Exception { + try (final LineReadingTcpServer server = new LineReadingTcpServer()) { + + // Start the server. + server.start("Main", 0); + final int port = server.serverSocket.getLocalPort(); + + // Initialize the logger context. + final LoggerContext loggerContext = initContext(port); + try { + + // Verify the initial working state. + verifyLoggingSuccess(server); + + // Stop the server, and verify the logging failure. + server.close(); + verifyLoggingFailure(); + + // Start the server again, and verify the logging success. + server.start("Main", port); + verifyLoggingSuccess(server); + + } + + // Shutdown the logger context. + finally { + Configurator.shutdown(loggerContext); + } + + } + } + + /** + * Tests if all the {@link InetSocketAddress}es returned by an {@link HostResolver} is used for fallback on reconnect attempts. + */ + @Test + void reconnect_should_fallback_when_there_are_multiple_resolved_hosts() throws Exception { + try (final LineReadingTcpServer primaryServer = new LineReadingTcpServer(); + final LineReadingTcpServer secondaryServer = new LineReadingTcpServer()) { + + // Start servers. + primaryServer.start("Primary", 0); + secondaryServer.start("Secondary", 0); + + // Mock the host resolver. + final FixedHostResolver hostResolver = FixedHostResolver.ofServers(primaryServer, secondaryServer); + TcpSocketManager.setHostResolver(hostResolver); + try { + + // Initialize the logger context. + final LoggerContext loggerContext = initContext( + // Passing an invalid port, since the resolution is supposed to be performed by the mocked host resolver anyway. + 0); + try { + + // Verify the initial working state on the primary server. + verifyLoggingSuccess(primaryServer); + + // Stop the primary server, and verify the logging success due to fallback on to the secondary server. + primaryServer.close(); + verifyLoggingSuccess(secondaryServer); + + } + + // Shutdown the logger context. + finally { + Configurator.shutdown(loggerContext); + } + + } finally { + // Reset the host resolver. + TcpSocketManager.setHostResolver(new HostResolver()); + } + + } + } + + private static LoggerContext initContext(final int port) { + + // Create the configuration builder. + final ConfigurationBuilder<BuiltConfiguration> configBuilder = ConfigurationBuilderFactory + .newConfigurationBuilder() + .setStatusLevel(Level.ERROR) + .setConfigurationName(SocketAppenderReconnectTest.class.getSimpleName()); + + // Create the configuration. + final String appenderName = "Socket"; + final Configuration config = configBuilder + .add(configBuilder + .newAppender(appenderName, "SOCKET") + .addAttribute("host", "localhost") + .addAttribute("port", String.valueOf(port)) + .addAttribute("protocol", Protocol.TCP) + .addAttribute("ignoreExceptions", false) + .addAttribute("reconnectionDelayMillis", 10) + .addAttribute("immediateFlush", true) + .add(configBuilder + .newLayout("PatternLayout") + .addAttribute("pattern", "%m%n"))) + .add(configBuilder.newLogger("org.apache.logging.log4j", Level.DEBUG)) + .add(configBuilder + .newRootLogger(Level.ERROR) + .add(configBuilder.newAppenderRef(appenderName))) + .build(false); + + // Initialize the configuration. + return Configurator.initialize(config); + + } + + private static void verifyLoggingSuccess(final LineReadingTcpServer server) throws Exception { + final int messageCount = 100; + // noinspection ConstantConditions + assertTrue(messageCount > 1, "was expecting messageCount to be bigger than 1 due to LOG4J2-2829, found: " + messageCount); + final List<String> expectedMessages = IntStream + .range(0, messageCount) + .mapToObj(messageIndex -> String.format("m%02d", messageIndex)) + .collect(Collectors.toList()); + final Logger logger = LogManager.getLogger(); + for (int messageIndex = 0; messageIndex < expectedMessages.size(); messageIndex++) { + final String message = expectedMessages.get(messageIndex); + // Due to socket initialization, the first write() might need some extra effort. + if (messageIndex == 0) { + await() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.SECONDS) + .until(() -> { + logger.info(message); + return true; + }); + } else { + logger.info(message); + } + } + expectedMessages.forEach(logger::info); + final List<String> actualMessages = server.pollLines(messageCount); + assertEquals(expectedMessages, actualMessages); + } + + private static void verifyLoggingFailure() { + final Logger logger = LogManager.getLogger(); + int retryCount = 3; + // noinspection ConstantConditions + assertTrue(retryCount > 1, "was expecting retryCount to be bigger than 1 due to LOG4J2-2829, found: " + retryCount); + for (int i = 0; i < retryCount; i++) { + try { + logger.info("should fail #" + i); + fail("should have failed #" + i); + } catch (final AppenderLoggingException ignored) {} + } + } + + /** + * A simple TCP server implementation reading the accepted connection's input stream into a blocking queue of lines. + * <p> + * The implementation is thread-safe, yet connections are handled sequentially, i.e., no parallelization. + * The input stream of the connection is decoded in UTF-8. + * </p> + */ + private static final class LineReadingTcpServer implements AutoCloseable { + + private volatile boolean running = false; + + private ServerSocket serverSocket = null; + + private Socket clientSocket = null; + + private Thread readerThread = null; + + private final BlockingQueue<String> lines = new LinkedBlockingQueue<>(); + + private LineReadingTcpServer() {} + + private synchronized void start(final String name, final int port) throws IOException { + if (!running) { + running = true; + serverSocket = createServerSocket(port); + readerThread = createReaderThread(name); + } + } + + private ServerSocket createServerSocket(final int port) throws IOException { + final ServerSocket serverSocket = new ServerSocket(port); + serverSocket.setReuseAddress(true); + serverSocket.setSoTimeout(0); // Zero indicates accept() will block indefinitely. + return serverSocket; + } + + private Thread createReaderThread(final String name) { + final String threadName = "LineReadingTcpSocketServerReader-" + name; + final Thread thread = new Thread(this::acceptClients, threadName); + thread.setDaemon(true); // Avoid blocking JVM exit. + thread.setUncaughtExceptionHandler((ignored, error) -> + LOGGER.error("uncaught reader thread exception", error)); + thread.start(); + return thread; + } + + private void acceptClients() { + try { + while (running) { + acceptClient(); + } + } catch (final Exception error) { + LOGGER.error("failed accepting client connections", error); + } + } + + private void acceptClient() throws Exception { + + // Accept the client connection. + final Socket clientSocket; + try { + clientSocket = serverSocket.accept(); + } catch (SocketException ignored) { + return; + } + clientSocket.setSoLinger(true, 0); // Enable immediate forceful close. + synchronized (this) { + if (running) { + this.clientSocket = clientSocket; + } + } + + // Read from the client. + try (final InputStream clientInputStream = clientSocket.getInputStream(); + final InputStreamReader clientReader = new InputStreamReader(clientInputStream, StandardCharsets.UTF_8); + final BufferedReader clientBufferedReader = new BufferedReader(clientReader)) { + while (running) { + final String line = clientBufferedReader.readLine(); + if (line == null) { + break; + } + lines.put(line); + } + } + + // Ignore connection failures. + catch (final SocketException ignored) {} + + // Clean up the client connection. + finally { + try { + synchronized (this) { + if (!clientSocket.isClosed()) { + clientSocket.shutdownOutput(); + clientSocket.close(); + } + this.clientSocket = null; + } + } catch (final Exception error) { + LOGGER.error("failed closing client socket", error); + } + } + + } + + @Override + public void close() throws Exception { + + // Stop the reader, if running. + Thread stoppedReaderThread = null; + synchronized (this) { + if (running) { + running = false; + // acceptClient() might have closed the client socket due to a connection failure and haven't created a new one yet. + // Hence, here we double-check if the client connection is in place. + if (clientSocket != null && !clientSocket.isClosed()) { + // Interrupting a thread is not sufficient to unblock operations waiting on socket I/O: https://stackoverflow.com/a/4426050/1278899 + // Hence, here we close the client socket to unblock the read from the client socket. + clientSocket.close(); + } + serverSocket.close(); + stoppedReaderThread = readerThread; + clientSocket = null; + serverSocket = null; + readerThread = null; + } + } + + // We wait for the termination of the reader thread outside the synchronized block. + // Otherwise, there is a chance of deadlock with this join() and the synchronized block inside the acceptClient(). + if (stoppedReaderThread != null) { + stoppedReaderThread.join(); + } + + } + + private List<String> pollLines(@SuppressWarnings("SameParameterValue") final int count) throws InterruptedException, TimeoutException { + final List<String> polledLines = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + final String polledLine = pollLine(); + polledLines.add(polledLine); + } + return polledLines; + } + + private String pollLine() throws InterruptedException, TimeoutException { + final String line = lines.poll(2, TimeUnit.SECONDS); + if (line == null) { + throw new TimeoutException(); + } + return line; + } + + } + + /** + * {@link HostResolver} implementation always resolving to the given list of {@link #addresses}. + */ + private static final class FixedHostResolver extends HostResolver { + + private final List<InetSocketAddress> addresses; + + private FixedHostResolver(List<InetSocketAddress> addresses) { + this.addresses = addresses; + } + + private static FixedHostResolver ofServers(LineReadingTcpServer... servers) { + List<InetSocketAddress> addresses = Arrays + .stream(servers) + .map(server -> (InetSocketAddress) server.serverSocket.getLocalSocketAddress()) + .collect(Collectors.toList()); + return new FixedHostResolver(addresses); + } + + @Override + public List<InetSocketAddress> resolveHost(String ignoredHost, int ignoredPort) { + return addresses; + } + + } + +} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketMessageLossTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketMessageLossTest.java deleted file mode 100644 index c4eff4d..0000000 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketMessageLossTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ -package org.apache.logging.log4j.core.net; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.AppenderLoggingException; -import org.apache.logging.log4j.core.test.junit.LoggerContextSource; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -@Disabled("Currently needs better port choosing support") -@LoggerContextSource("log4j-socket2.xml") -public class SocketMessageLossTest { - private static final int SOCKET_PORT = AvailablePortFinder.getNextAvailable(); - - @Test - public void testSocket(final LoggerContext context) throws Exception { - TestSocketServer testServer; - ExecutorService executor = null; - Future<InputStream> futureIn; - - try { - executor = Executors.newSingleThreadExecutor(); - System.err.println("Initializing server"); - testServer = new TestSocketServer(); - futureIn = executor.submit(testServer); - - //System.err.println("Initializing logger"); - final Logger logger = context.getLogger(getClass().getName()); - - String message = "Log #1"; - logger.error(message); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(futureIn.get())); - assertEquals(message, reader.readLine()); - - //System.err.println("Closing server"); - closeQuietly(testServer); - assertTrue(testServer.server.isClosed(), "Server not shutdown"); - - //System.err.println("Sleeping to ensure no race conditions"); - Thread.sleep(1000); - - message = "Log #2"; - try { - logger.error(message); - fail("Expected exception not thrown"); - } catch (final AppenderLoggingException e) { - // An exception is expected. - } - - message = "Log #3"; - try { - logger.error(message); - fail("Expected exception not thrown"); - } catch (final AppenderLoggingException e) { - // An exception is expected. - } - } finally { - closeQuietly(executor); - } - } - - - private static class TestSocketServer implements Callable<InputStream> { - private final ServerSocket server; - private Socket client; - - public TestSocketServer() throws Exception { - server = new ServerSocket(SOCKET_PORT); - } - - @Override - public InputStream call() throws Exception { - client = server.accept(); - return client.getInputStream(); - } - - public void close() { - closeQuietly(client); - closeQuietly(server); - } - - private static void closeQuietly(final ServerSocket socket) { - if (null != socket) { - try { - socket.close(); - } catch (final IOException ignore) { - } - } - } - - private static void closeQuietly(final Socket socket) { - if (null != socket) { - try { - socket.setSoLinger(true, 0); - socket.close(); - } catch (final IOException ignore) { - } - } - } - } - - private static void closeQuietly(final ExecutorService executor) { - if (null != executor) { - executor.shutdownNow(); - } - } - - private static void closeQuietly(final TestSocketServer testServer) { - if (null != testServer) { - testServer.close(); - } - } -} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketReconnectTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketReconnectTest.java deleted file mode 100644 index 49336f2..0000000 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketReconnectTest.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ -package org.apache.logging.log4j.core.net; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.AppenderLoggingException; -import org.apache.logging.log4j.core.config.Configurator; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.apache.logging.log4j.util.Strings; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.*; - -//@Disabled("Currently needs better port choosing support") -public class SocketReconnectTest { - private static final int SOCKET_PORT1 = AvailablePortFinder.getNextAvailable(); - private static final int SOCKET_PORT2 = AvailablePortFinder.getNextAvailable(); - - private static final String CONFIG = "log4j-socket.xml"; - - private static final String SHUTDOWN = "Shutdown" + Strings.LINE_SEPARATOR + - "................................................................" + Strings.LINE_SEPARATOR + - "................................................................" + Strings.LINE_SEPARATOR + - "................................................................" + Strings.LINE_SEPARATOR + - "................................................................" + Strings.LINE_SEPARATOR; - - public static LocalHostResolver resolver = new LocalHostResolver(); - - private static LoggerContext loggerContext; - - private static final List<String> list = new ArrayList<>(); - private static int[] ports; - private static TestSocketServer server1; - private static TestSocketServer server2; - private static Logger logger; - - - @BeforeAll - public static void beforeClass() throws IOException, InterruptedException { - server1 = new TestSocketServer(0); - server2 = new TestSocketServer(0); - server1.start(); - server2.start(); - Thread.sleep(100); - ports = new int[] { server1.getPort(), server2.getPort()}; - resolver.ports = ports; - TcpSocketManager.setHostResolver(resolver); - loggerContext = Configurator.initialize("SocketReconnectTest", SocketReconnectTest.class.getClassLoader(), - CONFIG); - logger = LogManager.getLogger(SocketReconnectTest.class); - server1.shutdown(); - server1.join(); - server2.shutdown(); - server2.join(); - server1 = null; - server2 = null; - Thread.sleep(100); - list.clear(); - } - - @AfterAll - public static void afterClass() { - Configurator.shutdown(loggerContext); - } - - @AfterEach - public void after() throws InterruptedException { - if (server1 != null) { - server1.shutdown(); - server1.join(); - } - if (server2 != null) { - server2.shutdown(); - server2.join(); - } - server1 = null; - server2 = null; - Thread.sleep(300); - } - - @Test - public void testReconnect() throws Exception { - list.clear(); - resolver.ports = new int[] {ports[0]}; - server1 = new TestSocketServer(ports[0]); - server1.start(); - Thread.sleep(200); - String message = "Log #1"; - String msg = null; - for (int i = 0; i < 5; ++i) { - logger.error(message); - Thread.sleep(100); - if (list.size() > 0) { - msg = list.get(0); - if (msg != null) { - break; - } - } - } - assertNotNull(msg, "No message"); - assertEquals(message, msg); - - logger.error(SHUTDOWN); - server1.join(); - - list.clear(); - - message = "Log #2"; - boolean exceptionCaught = false; - - for (int i = 0; i < 100; ++i) { - try { - logger.error(message); - } catch (final AppenderLoggingException e) { - exceptionCaught = true; - break; - // System.err.println("Caught expected exception"); - } - } - assertTrue(exceptionCaught, "No Exception thrown"); - message = "Log #3"; - - - server1 = new TestSocketServer(ports[0]); - server1.start(); - Thread.sleep(300); - - msg = null; - for (int i = 0; i < 5; ++i) { - logger.error(message); - Thread.sleep(100); - if (list.size() > 0) { - msg = list.get(0); - if (msg != null) { - break; - } - } - } - assertNotNull(msg, "No message"); - assertEquals(message, msg); - logger.error(SHUTDOWN); - server1.join(); - } - - @Test - public void testFailover() throws Exception { - list.clear(); - server1 = new TestSocketServer(ports[0]); - server2 = new TestSocketServer(ports[1]); - resolver.ports = ports; - server1.start(); - server2.start(); - Thread.sleep(100); - - String message = "Log #1"; - - String msg = null; - for (int i = 0; i < 5; ++i) { - logger.error(message); - Thread.sleep(100); - if (list.size() > 0) { - msg = list.get(0); - if (msg != null) { - break; - } - } - } - assertNotNull(msg, "No message"); - assertEquals(message, msg); - - server1.shutdown(); - server1.join(); - - list.clear(); - - message = "Log #2"; - for (int i = 0; i < 5; ++i) { - logger.error(message); - Thread.sleep(100); - if (list.size() > 0) { - msg = list.get(0); - if (msg != null) { - break; - } - } - } - assertNotNull(msg, "No message"); - assertEquals(message, msg); - - server2.shutdown(); - server2.join(); - } - - - private static class TestSocketServer extends Thread { - private volatile boolean shutdown = false; - private volatile boolean started = false; - private volatile Socket client; - private final int port; - private ServerSocket server; - - public TestSocketServer(int port) throws IOException { - this.port = port; - server = new ServerSocket(port); - } - - public int getPort() { - return port == 0 ? server.getLocalPort() : port; - } - - public void shutdown() { - if (!shutdown) { - shutdown = true; - if (server != null && server.isBound()) { - try { - if (client != null) { - Socket serverClient = client; - client = null; - serverClient.shutdownInput(); - serverClient.shutdownOutput(); - serverClient.setSoLinger(true, 0); - serverClient.close(); - } - ServerSocket serverSocket = server; - server = null; - serverSocket.close(); - } catch (Exception ex) { - System.out.println("Unable to send shutdown message"); - ex.printStackTrace(); - } - } - return; - } - } - - @Override - public void run() { - client = null; - try { - client = server.accept(); - started = true; - while (!shutdown) { - final BufferedReader reader = new BufferedReader(new InputStreamReader(client.getInputStream())); - final String line = reader.readLine(); - if (line == null || line.equals("Shutdown")) { - shutdown = true; - } else { - list.add(line); - } - } - } catch (final SocketException ex) { - if (!shutdown) { - ex.printStackTrace(); - } - } catch (final Exception ex) { - ex.printStackTrace(); - } finally { - if (client != null && !client.isClosed()) { - try { - client.setSoLinger(true, 0); - client.shutdownOutput(); - client.close(); - } catch (final Exception ex) { - System.out.println("Unable to close socket: " + ex.getMessage()); - } - } - if (server != null && !server.isClosed()) { - try { - server.close(); - } catch (final Exception ex) { - System.out.println("Unable to close server socket: " + ex.getMessage()); - } - } - } - } - } - - private static class LocalHostResolver extends TcpSocketManager.HostResolver { - public volatile int[] ports; - - @Override - public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException { - int[] socketPorts = ports; - List<InetSocketAddress> socketAddresses = new ArrayList<>(ports.length); - InetAddress addr = InetAddress.getLocalHost(); - for (int i = 0; i < socketPorts.length; ++i){ - socketAddresses.add(new InetSocketAddress(addr, socketPorts[i])); - } - return socketAddresses; - } - } -} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketTest.java deleted file mode 100644 index 7f1145d..0000000 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/net/SocketTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ -package org.apache.logging.log4j.core.net; - -import java.io.IOException; -import java.io.InputStream; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.Callable; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.AppenderLoggingException; -import org.apache.logging.log4j.core.test.junit.LoggerContextSource; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -@Disabled("Currently needs better port choosing support") -@LoggerContextSource("log4j-socket.xml") -public class SocketTest { - private static final int SOCKET_PORT = AvailablePortFinder.getNextAvailable(); - - @Test - public void testConnect(final LoggerContext context) { - System.err.println("Initializing logger"); - Logger logger = assertDoesNotThrow(() -> context.getLogger(getClass().getName())); - assertThrows(AppenderLoggingException.class, () -> logger.error("Log #1")); - } - - private static class TestSocketServer implements Callable<InputStream> { - private ServerSocket server; - private Socket client; - - @Override - public InputStream call() throws Exception { - server = new ServerSocket(SOCKET_PORT); - client = server.accept(); - return client.getInputStream(); - } - - public void close() { - closeQuietly(client); - closeQuietly(server); - } - - private static void closeQuietly(final ServerSocket socket) { - if (null != socket) { - try { - socket.close(); - } catch (final IOException ignore) { - } - } - } - - private static void closeQuietly(final Socket socket) { - if (null != socket) { - try { - socket.close(); - } catch (final IOException ignore) { - } - } - } - } - -} diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 00172f4..a167fc5 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -237,6 +237,9 @@ based on performance improvements in modern Java releases. </action> <!-- FIXES --> + <action issue="LOG4J2-2829" dev="vy" type="fix"> + SocketAppender should propagate failures when reconnection fails. + </action> <action issue="LOG4J2-3172" dev="vy" type="fix" due-to="Barry Fleming"> Buffer immutable log events in the SmtpManager. </action>