Repository: logging-log4j2 Updated Branches: refs/heads/master 132a53afd -> a5f3b03aa
[LOG4J2-1311] SocketAppender will lose several events after re-connection to server. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/a5f3b03a Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/a5f3b03a Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/a5f3b03a Branch: refs/heads/master Commit: a5f3b03aa1f0ab9f301a830371544064aa5e4c3d Parents: 132a53a Author: Gary Gregory <[email protected]> Authored: Sat Jul 29 00:55:16 2017 -0700 Committer: Gary Gregory <[email protected]> Committed: Sat Jul 29 00:55:16 2017 -0700 ---------------------------------------------------------------------- log4j-core-its/pom.xml | 6 ++ .../net/AbstractSocketAppenderReconnectIT.java | 102 +++++++++++++++++++ .../net/SocketAppenderConnectPostStartupIT.java | 78 ++++++++++++++ .../net/SocketAppenderConnectReConnectIT.java | 95 +++++++++++++++++ .../log4j/core/net/TcpSocketManager.java | 91 ++++++++++++----- src/changes/changes.xml | 3 + 6 files changed, 350 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/pom.xml ---------------------------------------------------------------------- diff --git a/log4j-core-its/pom.xml b/log4j-core-its/pom.xml index a9985dd..ce60982 100644 --- a/log4j-core-its/pom.xml +++ b/log4j-core-its/pom.xml @@ -49,6 +49,12 @@ </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-server</artifactId> + <version>2.9-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <type>test-jar</type> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java ---------------------------------------------------------------------- diff --git a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java new file mode 100644 index 0000000..2a146a3 --- /dev/null +++ b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.appender.net; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.appender.SocketAppender; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.core.net.AbstractSocketManager; +import org.apache.logging.log4j.message.StringMapMessage; +import org.apache.logging.log4j.server.TcpSocketServer; +import org.junit.After; +import org.junit.Assert; + +/** + * Subclass for tests that reconnect to an Apache Socket Server. The class makes sure resources are properly shutdown + * after each @Test method. A subclass normally only has one @Test method. + * <p> + * LOG4J2-1311 SocketAppender will lost first several logs after re-connection to log servers. + * </p> + */ +public class AbstractSocketAppenderReconnectIT { + + protected SocketAppender appender; + protected int port; + protected TcpSocketServer<InputStream> server; + protected Thread thread; + + @After + public void after() { + shutdown(); + if (appender != null) { + appender.stop(); + // Make sure the manager is gone as to not have bad side effect on other tests. + @SuppressWarnings("resource") + final AbstractSocketManager appenderManager = appender.getManager(); + if (appenderManager != null) { + Assert.assertFalse(AbstractManager.hasManager(appenderManager.getName())); + } + } + } + + protected void appendEvent(final SocketAppender appender) { + final Map<String, String> map = new HashMap<>(); + final String messageText = "Hello, World!"; + final String loggerName = this.getClass().getName(); + map.put("messageText", messageText); + map.put("threadName", Thread.currentThread().getName()); + // @formatter:off + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLoggerFqcn(loggerName) + .setLevel(Level.INFO) + .setMessage(new StringMapMessage(map)) + .setTimeMillis(System.currentTimeMillis()) + .build(); + // @formatter:on + appender.append(event); + } + + protected void shutdown() { + try { + server.shutdown(); + } catch (final IOException e) { + e.printStackTrace(); + } + try { + thread.join(); + } catch (final InterruptedException e) { + // ignore + } + } + + protected Thread startServer(long sleepMillis) throws InterruptedException { + thread = server.startNewThread(); + if (sleepMillis >= 0) { + Thread.sleep(sleepMillis); + } + return thread; + } + +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java ---------------------------------------------------------------------- diff --git a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java new file mode 100644 index 0000000..7f781dc --- /dev/null +++ b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.appender.net; + +import org.apache.logging.log4j.core.appender.AppenderLoggingException; +import org.apache.logging.log4j.core.appender.SocketAppender; +import org.apache.logging.log4j.core.layout.JsonLayout; +import org.apache.logging.log4j.core.net.TcpSocketManager; +import org.apache.logging.log4j.server.TcpSocketServer; +import org.apache.logging.log4j.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that a JMS Appender start when there is no broker and connect the broker when it is started later.. + * <p> + * LOG4J2-1934 JMS Appender does not know how to recover from a broken connection. See + * https://issues.apache.org/jira/browse/LOG4J2-1934 + * </p> + * <p> + * This test class' single test method performs the following: + * </p> + * <ol> + * <li>Starts a SocketAppender (no reconnect thread is running)</li> + * <li>Logs an event (fails and the manager starts its reconnect thread)</li> + * <li>Starts Apache Socket Server</li> + * <li>Logs an event successfully</li> + * </ol> + */ +public class SocketAppenderConnectPostStartupIT extends AbstractSocketAppenderReconnectIT { + + @Test + public void testConnectPostStartup() throws Exception { + // + // Start appender + final int port = AvailablePortFinder.getNextAvailable(); + // Start appender, fails to connect and starts reconnect thread. + // @formatter:off + appender = SocketAppender.newBuilder() + .withPort(port) + .withReconnectDelayMillis(1000) + .withName("test") + .withLayout(JsonLayout.newBuilder().build()) + .build(); + // @formatter:on + appender.start(); + // + // Logging will fail but the socket manager is still running its reconnect thread, waiting for the server. + try { + appendEvent(appender); + Assert.fail("Expected to catch a " + AppenderLoggingException.class.getName()); + } catch (final AppenderLoggingException e) { + // Expected. + } + // + // Start server + server = TcpSocketServer.createJsonSocketServer(port); + // Wait to allow the reconnect thread to connect + startServer(((TcpSocketManager) appender.getManager()).getReconnectionDelayMillis() * 2); + // + // Logging now succeeds. + appendEvent(appender); + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java ---------------------------------------------------------------------- diff --git a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java new file mode 100644 index 0000000..a39baa7 --- /dev/null +++ b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.appender.net; + +import java.io.IOException; +import java.net.Socket; + +import org.apache.logging.log4j.core.appender.SocketAppender; +import org.apache.logging.log4j.core.layout.JsonLayout; +import org.apache.logging.log4j.server.TcpSocketServer; +import org.apache.logging.log4j.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that a Socket Appender can reconnect to a server after it has been recycled. + * <p> + * LOG4J2-1311 SocketAppender will lost first serveral logs after re-connection to log servers. + * </p> + * <p> + * This test class' single test method performs the following: + * </p> + * <ol> + * <li>Starts Apache Socket Server</li> + * <li>Starts a Socket Appender</li> + * <li>Logs an event OK</li> + * <li>Stops Apache Socket Server</li> + * <li>Starts Apache Socket Server</li> + * <li>Logs an event</li> + * </ol> + */ +public class SocketAppenderConnectReConnectIT extends AbstractSocketAppenderReconnectIT { + + @Test + public void testConnectReConnect() throws Exception { + port = AvailablePortFinder.getNextAvailable(); + // Start server + server = TcpSocketServer.createJsonSocketServer(port); + startServer(200); + // Start appender + // @formatter:off + appender = SocketAppender.newBuilder() + .withPort(port) + .withReconnectDelayMillis(1000) + .withName("test") + .withLayout(JsonLayout.newBuilder().build()) + .build(); + // @formatter:on + appender.start(); + // Log message + appendEvent(appender); + // Stop server + shutdown(); + // I should not be able to connect to the server now + try { + try (Socket socket = new Socket("localhost", port)) { + Assert.fail("The server socket should not be opened: " + socket); + } + } catch (IOException e) { + // expected + } + // HACK START - Gary + // SANS HACK, the test passes, as somehow the socket in the appender is still valid + // On Windows 10, I did not try other OSs: + // HERE, I BREAKPOINT AND GO TO THE OS AND FORCE THE TCP CONNECTION TO CLOSE (TcpView.exe)), SUCH THAT + // INTERNALLY THE MANAGER GETS: + // java.net.SocketException: Connection reset by peer: socket write error + // HACK END + // + // Restart server on the SAME port + server = TcpSocketServer.createJsonSocketServer(port); + thread = startServer(0); + try (Socket socket = new Socket("localhost", port)) { + Assert.assertTrue(socket.isBound()); + Assert.assertFalse(socket.isClosed()); + } + // Logging again should cause the appender to reconnect + appendEvent(appender); + } + +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java index c39a817..ede980e 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java @@ -100,8 +100,8 @@ public class TcpSocketManager extends AbstractSocketManager { final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis, final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) { - this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail, layout, bufferSize, - null); + this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail, + layout, bufferSize, null); } /** @@ -202,7 +202,7 @@ public class TcpSocketManager extends AbstractSocketManager { connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY); } - @SuppressWarnings("sync-override") // synchronization on "this" is done within the method + @SuppressWarnings("sync-override") // synchronization on "this" is done within the method @Override protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) { if (socket == null) { @@ -215,22 +215,43 @@ public class TcpSocketManager extends AbstractSocketManager { } synchronized (this) { try { - @SuppressWarnings("resource") // outputStream is managed by this class - final OutputStream outputStream = getOutputStream(); - outputStream.write(bytes, offset, length); - if (immediateFlush) { - outputStream.flush(); - } - } catch (final IOException ex) { + writeAndFlush(bytes, offset, length, immediateFlush); + } catch (final IOException causeEx) { if (retry && reconnector == null) { + final String config = inetAddress + ":" + port; reconnector = createReconnector(); - reconnector.start(); + try { + reconnector.reconnect(); + } catch (IOException reconnEx) { + LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}", + config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); + reconnector.start(); + throw new AppenderLoggingException( + String.format("Error sending to %s for %s", getName(), config), causeEx); + } + try { + writeAndFlush(bytes, offset, length, immediateFlush); + } catch (IOException e) { + throw new AppenderLoggingException( + String.format("Error writing to %s after reestablishing connection for %s", getName(), + config), + causeEx); + } } - throw new AppenderLoggingException("Error writing to " + getName(), ex); } } } + private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) + throws IOException { + @SuppressWarnings("resource") // outputStream is managed by this class + final OutputStream outputStream = getOutputStream(); + outputStream.write(bytes, offset, length); + if (immediateFlush) { + outputStream.flush(); + } + } + @Override protected synchronized boolean closeOutputStream() { final boolean closed = super.closeOutputStream(); @@ -306,17 +327,7 @@ public class TcpSocketManager extends AbstractSocketManager { while (!shutdown) { try { sleep(reconnectionDelayMillis); - final Socket sock = createSocket(inetAddress, port); - @SuppressWarnings("resource") // newOS is managed by the enclosing Manager. - final OutputStream newOS = sock.getOutputStream(); - synchronized (owner) { - Closer.closeSilently(getOutputStream()); - setOutputStream(newOS); - socket = sock; - reconnector = null; - shutdown = true; - } - LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket); + reconnect(); } catch (final InterruptedException ie) { LOGGER.debug("Reconnection interrupted."); } catch (final ConnectException ex) { @@ -328,6 +339,25 @@ public class TcpSocketManager extends AbstractSocketManager { } } } + + void reconnect() throws IOException { + final Socket sock = createSocket(inetAddress, port); + @SuppressWarnings("resource") // newOS is managed by the enclosing Manager. + final OutputStream newOS = sock.getOutputStream(); + synchronized (owner) { + Closer.closeSilently(getOutputStream()); + setOutputStream(newOS); + socket = sock; + reconnector = null; + shutdown = true; + } + LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket); + } + + @Override + public String toString() { + return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]"; + } } private Reconnector createReconnector() { @@ -375,13 +405,20 @@ public class TcpSocketManager extends AbstractSocketManager { this.bufferSize = bufferSize; this.socketOptions = socketOptions; } + + @Override + public String toString() { + return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis + + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail + + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]"; + } } /** * Factory to create a TcpSocketManager. */ protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> { - + @SuppressWarnings("resource") @Override public TcpSocketManager createManager(final String name, final FactoryData data) { @@ -403,7 +440,7 @@ public class TcpSocketManager extends AbstractSocketManager { data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout, data.bufferSize, data.socketOptions); } catch (final IOException ex) { - LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex); + LOGGER.error("TcpSocketManager (" + name + ") caught exception and will continue:" + ex, ex); os = NullOutputStream.getInstance(); } if (data.reconnectDelayMillis == 0) { @@ -440,4 +477,8 @@ public class TcpSocketManager extends AbstractSocketManager { return socket; } + public int getReconnectionDelayMillis() { + return reconnectionDelayMillis; + } + } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/src/changes/changes.xml ---------------------------------------------------------------------- diff --git a/src/changes/changes.xml b/src/changes/changes.xml index d8a6042..a3f6cdc 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -49,6 +49,9 @@ <action issue="LOG4J2-1990" dev="ggregory" type="fix" due-to="Philippe Mouawad"> ConcurrentModificationException logging a parameter of type Map. </action> + <action issue="LOG4J2-1311" dev="ggregory" type="fix" due-to="Xibing Liang"> + SocketAppender will lose several events after re-connection to server. + </action> <action issue="LOG4J2-1977" dev="ggregory" type="fix" due-to="Jerry xnslong"> Consider the StringBuilder's capacity instead of content length when trimming. </action>
