Repository: logging-log4j2
Updated Branches:
  refs/heads/master 132a53afd -> a5f3b03aa


[LOG4J2-1311] SocketAppender will lose several events after
re-connection to server.

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/a5f3b03a
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/a5f3b03a
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/a5f3b03a

Branch: refs/heads/master
Commit: a5f3b03aa1f0ab9f301a830371544064aa5e4c3d
Parents: 132a53a
Author: Gary Gregory <[email protected]>
Authored: Sat Jul 29 00:55:16 2017 -0700
Committer: Gary Gregory <[email protected]>
Committed: Sat Jul 29 00:55:16 2017 -0700

----------------------------------------------------------------------
 log4j-core-its/pom.xml                          |   6 ++
 .../net/AbstractSocketAppenderReconnectIT.java  | 102 +++++++++++++++++++
 .../net/SocketAppenderConnectPostStartupIT.java |  78 ++++++++++++++
 .../net/SocketAppenderConnectReConnectIT.java   |  95 +++++++++++++++++
 .../log4j/core/net/TcpSocketManager.java        |  91 ++++++++++++-----
 src/changes/changes.xml                         |   3 +
 6 files changed, 350 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/pom.xml
----------------------------------------------------------------------
diff --git a/log4j-core-its/pom.xml b/log4j-core-its/pom.xml
index a9985dd..ce60982 100644
--- a/log4j-core-its/pom.xml
+++ b/log4j-core-its/pom.xml
@@ -49,6 +49,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-server</artifactId>
+      <version>2.9-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-core</artifactId>
       <type>test-jar</type>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java
----------------------------------------------------------------------
diff --git 
a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java
 
b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java
new file mode 100644
index 0000000..2a146a3
--- /dev/null
+++ 
b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/AbstractSocketAppenderReconnectIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.SocketAppender;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.net.AbstractSocketManager;
+import org.apache.logging.log4j.message.StringMapMessage;
+import org.apache.logging.log4j.server.TcpSocketServer;
+import org.junit.After;
+import org.junit.Assert;
+
+/**
+ * Subclass for tests that reconnect to an Apache Socket Server. The class 
makes sure resources are properly shutdown
+ * after each @Test method. A subclass normally only has one @Test method.
+ * <p>
+ * LOG4J2-1311 SocketAppender will lost first several logs after re-connection 
to log servers.
+ * </p>
+ */
+public class AbstractSocketAppenderReconnectIT {
+
+    protected SocketAppender appender;
+    protected int port;
+    protected TcpSocketServer<InputStream> server;
+    protected Thread thread;
+
+    @After
+    public void after() {
+        shutdown();
+        if (appender != null) {
+            appender.stop();
+            // Make sure the manager is gone as to not have bad side effect on 
other tests.
+            @SuppressWarnings("resource")
+            final AbstractSocketManager appenderManager = 
appender.getManager();
+            if (appenderManager != null) {
+                
Assert.assertFalse(AbstractManager.hasManager(appenderManager.getName()));
+            }
+        }
+    }
+
+    protected void appendEvent(final SocketAppender appender) {
+        final Map<String, String> map = new HashMap<>();
+        final String messageText = "Hello, World!";
+        final String loggerName = this.getClass().getName();
+        map.put("messageText", messageText);
+        map.put("threadName", Thread.currentThread().getName());
+        // @formatter:off
+               final LogEvent event = Log4jLogEvent.newBuilder()
+                               .setLoggerName(loggerName)
+                               .setLoggerFqcn(loggerName)
+                               .setLevel(Level.INFO)
+                               .setMessage(new StringMapMessage(map))
+                               .setTimeMillis(System.currentTimeMillis())
+                               .build();
+               // @formatter:on
+        appender.append(event);
+    }
+
+    protected void shutdown() {
+        try {
+            server.shutdown();
+        } catch (final IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            thread.join();
+        } catch (final InterruptedException e) {
+            // ignore
+        }
+    }
+
+    protected Thread startServer(long sleepMillis) throws InterruptedException 
{
+        thread = server.startNewThread();
+        if (sleepMillis >= 0) {
+            Thread.sleep(sleepMillis);
+        }
+        return thread;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java
----------------------------------------------------------------------
diff --git 
a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java
 
b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java
new file mode 100644
index 0000000..7f781dc
--- /dev/null
+++ 
b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectPostStartupIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.net;
+
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.appender.SocketAppender;
+import org.apache.logging.log4j.core.layout.JsonLayout;
+import org.apache.logging.log4j.core.net.TcpSocketManager;
+import org.apache.logging.log4j.server.TcpSocketServer;
+import org.apache.logging.log4j.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that a JMS Appender start when there is no broker and connect the 
broker when it is started later..
+ * <p>
+ * LOG4J2-1934 JMS Appender does not know how to recover from a broken 
connection. See
+ * https://issues.apache.org/jira/browse/LOG4J2-1934
+ * </p>
+ * <p>
+ * This test class' single test method performs the following:
+ * </p>
+ * <ol>
+ * <li>Starts a SocketAppender (no reconnect thread is running)</li>
+ * <li>Logs an event (fails and the manager starts its reconnect thread)</li>
+ * <li>Starts Apache Socket Server</li>
+ * <li>Logs an event successfully</li>
+ * </ol>
+ */
+public class SocketAppenderConnectPostStartupIT extends 
AbstractSocketAppenderReconnectIT {
+
+    @Test
+    public void testConnectPostStartup() throws Exception {
+        //
+        // Start appender
+        final int port = AvailablePortFinder.getNextAvailable();
+        // Start appender, fails to connect and starts reconnect thread.
+        // @formatter:off
+        appender = SocketAppender.newBuilder()
+                .withPort(port)
+                .withReconnectDelayMillis(1000)
+                .withName("test")
+                .withLayout(JsonLayout.newBuilder().build())
+                .build();
+        // @formatter:on
+        appender.start();
+        //
+        // Logging will fail but the socket manager is still running its 
reconnect thread, waiting for the server.
+        try {
+            appendEvent(appender);
+            Assert.fail("Expected to catch a " + 
AppenderLoggingException.class.getName());
+        } catch (final AppenderLoggingException e) {
+            // Expected.
+        }
+        //
+        // Start server
+        server = TcpSocketServer.createJsonSocketServer(port);
+        // Wait to allow the reconnect thread to connect
+        startServer(((TcpSocketManager) 
appender.getManager()).getReconnectionDelayMillis() * 2);
+        //
+        // Logging now succeeds.
+        appendEvent(appender);
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java
----------------------------------------------------------------------
diff --git 
a/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java
 
b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java
new file mode 100644
index 0000000..a39baa7
--- /dev/null
+++ 
b/log4j-core-its/src/test/java/org/apache/logging/log4j/core/appender/net/SocketAppenderConnectReConnectIT.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.net;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.logging.log4j.core.appender.SocketAppender;
+import org.apache.logging.log4j.core.layout.JsonLayout;
+import org.apache.logging.log4j.server.TcpSocketServer;
+import org.apache.logging.log4j.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that a Socket Appender can reconnect to a server after it has been 
recycled.
+ * <p>
+ * LOG4J2-1311 SocketAppender will lost first serveral logs after 
re-connection to log servers.
+ * </p>
+ * <p>
+ * This test class' single test method performs the following:
+ * </p>
+ * <ol>
+ * <li>Starts Apache Socket Server</li>
+ * <li>Starts a Socket Appender</li>
+ * <li>Logs an event OK</li>
+ * <li>Stops Apache Socket Server</li>
+ * <li>Starts Apache Socket Server</li>
+ * <li>Logs an event</li>
+ * </ol>
+ */
+public class SocketAppenderConnectReConnectIT extends 
AbstractSocketAppenderReconnectIT {
+
+    @Test
+    public void testConnectReConnect() throws Exception {
+        port = AvailablePortFinder.getNextAvailable();
+        // Start server
+        server = TcpSocketServer.createJsonSocketServer(port);
+        startServer(200);
+        // Start appender
+        // @formatter:off
+        appender = SocketAppender.newBuilder()
+                .withPort(port)
+                .withReconnectDelayMillis(1000)
+                .withName("test")
+                .withLayout(JsonLayout.newBuilder().build())
+                .build();
+        // @formatter:on
+        appender.start();
+        // Log message
+        appendEvent(appender);
+        // Stop server
+        shutdown();
+        // I should not be able to connect to the server now
+        try {
+            try (Socket socket = new Socket("localhost", port)) {
+                Assert.fail("The server socket should not be opened: " + 
socket);
+            }
+        } catch (IOException e) {
+            // expected
+        }
+        // HACK START - Gary
+        // SANS HACK, the test passes, as somehow the socket in the appender 
is still valid
+        // On Windows 10, I did not try other OSs:
+        // HERE, I BREAKPOINT AND GO TO THE OS AND FORCE THE TCP CONNECTION TO 
CLOSE (TcpView.exe)), SUCH THAT
+        // INTERNALLY THE MANAGER GETS:
+        // java.net.SocketException: Connection reset by peer: socket write 
error
+        // HACK END
+        //
+        // Restart server on the SAME port
+        server = TcpSocketServer.createJsonSocketServer(port);
+        thread = startServer(0);
+        try (Socket socket = new Socket("localhost", port)) {
+            Assert.assertTrue(socket.isBound());
+            Assert.assertFalse(socket.isClosed());
+        }
+        // Logging again should cause the appender to reconnect
+        appendEvent(appender);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
index c39a817..ede980e 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
@@ -100,8 +100,8 @@ public class TcpSocketManager extends AbstractSocketManager 
{
             final InetAddress inetAddress, final String host, final int port, 
final int connectTimeoutMillis,
             final int reconnectionDelayMillis, final boolean immediateFail, 
final Layout<? extends Serializable> layout,
             final int bufferSize) {
-        this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, 
reconnectionDelayMillis, immediateFail, layout, bufferSize,
-                null);
+        this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, 
reconnectionDelayMillis, immediateFail,
+                layout, bufferSize, null);
     }
 
     /**
@@ -202,7 +202,7 @@ public class TcpSocketManager extends AbstractSocketManager 
{
                 connectTimeoutMillis, reconnectDelayMillis, immediateFail, 
layout, bufferSize, socketOptions), FACTORY);
     }
 
-    @SuppressWarnings("sync-override") // synchronization on "this" is done 
within the method 
+    @SuppressWarnings("sync-override") // synchronization on "this" is done 
within the method
     @Override
     protected void write(final byte[] bytes, final int offset, final int 
length, final boolean immediateFlush) {
         if (socket == null) {
@@ -215,22 +215,43 @@ public class TcpSocketManager extends 
AbstractSocketManager {
         }
         synchronized (this) {
             try {
-                @SuppressWarnings("resource") // outputStream is managed by 
this class 
-                final OutputStream outputStream = getOutputStream();
-                outputStream.write(bytes, offset, length);
-                if (immediateFlush) {
-                    outputStream.flush();
-                }
-            } catch (final IOException ex) {
+                writeAndFlush(bytes, offset, length, immediateFlush);
+            } catch (final IOException causeEx) {
                 if (retry && reconnector == null) {
+                    final String config = inetAddress + ":" + port;
                     reconnector = createReconnector();
-                    reconnector.start();
+                    try {
+                        reconnector.reconnect();
+                    } catch (IOException reconnEx) {
+                        LOGGER.debug("Cannot reestablish socket connection to 
{}: {}; starting reconnector thread {}",
+                                config, reconnEx.getLocalizedMessage(), 
reconnector.getName(), reconnEx);
+                        reconnector.start();
+                        throw new AppenderLoggingException(
+                                String.format("Error sending to %s for %s", 
getName(), config), causeEx);
+                    }
+                    try {
+                        writeAndFlush(bytes, offset, length, immediateFlush);
+                    } catch (IOException e) {
+                        throw new AppenderLoggingException(
+                                String.format("Error writing to %s after 
reestablishing connection for %s", getName(),
+                                        config),
+                                causeEx);
+                    }
                 }
-                throw new AppenderLoggingException("Error writing to " + 
getName(), ex);
             }
         }
     }
 
+    private void writeAndFlush(final byte[] bytes, final int offset, final int 
length, final boolean immediateFlush)
+            throws IOException {
+        @SuppressWarnings("resource") // outputStream is managed by this class
+        final OutputStream outputStream = getOutputStream();
+        outputStream.write(bytes, offset, length);
+        if (immediateFlush) {
+            outputStream.flush();
+        }
+    }
+
     @Override
     protected synchronized boolean closeOutputStream() {
         final boolean closed = super.closeOutputStream();
@@ -306,17 +327,7 @@ public class TcpSocketManager extends 
AbstractSocketManager {
             while (!shutdown) {
                 try {
                     sleep(reconnectionDelayMillis);
-                    final Socket sock = createSocket(inetAddress, port);
-                    @SuppressWarnings("resource") // newOS is managed by the 
enclosing Manager.
-                    final OutputStream newOS = sock.getOutputStream();
-                    synchronized (owner) {
-                        Closer.closeSilently(getOutputStream());
-                        setOutputStream(newOS);
-                        socket = sock;
-                        reconnector = null;
-                        shutdown = true;
-                    }
-                    LOGGER.debug("Connection to {}:{} reestablished: {}", 
host, port, socket);
+                    reconnect();
                 } catch (final InterruptedException ie) {
                     LOGGER.debug("Reconnection interrupted.");
                 } catch (final ConnectException ex) {
@@ -328,6 +339,25 @@ public class TcpSocketManager extends 
AbstractSocketManager {
                 }
             }
         }
+
+        void reconnect() throws IOException {
+            final Socket sock = createSocket(inetAddress, port);
+            @SuppressWarnings("resource") // newOS is managed by the enclosing 
Manager.
+            final OutputStream newOS = sock.getOutputStream();
+            synchronized (owner) {
+                Closer.closeSilently(getOutputStream());
+                setOutputStream(newOS);
+                socket = sock;
+                reconnector = null;
+                shutdown = true;
+            }
+            LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, 
socket);
+        }
+
+        @Override
+        public String toString() {
+            return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + 
", owner=" + owner + "]";
+        }
     }
 
     private Reconnector createReconnector() {
@@ -375,13 +405,20 @@ public class TcpSocketManager extends 
AbstractSocketManager {
             this.bufferSize = bufferSize;
             this.socketOptions = socketOptions;
         }
+
+        @Override
+        public String toString() {
+            return "FactoryData [host=" + host + ", port=" + port + ", 
connectTimeoutMillis=" + connectTimeoutMillis
+                    + ", reconnectDelayMillis=" + reconnectDelayMillis + ", 
immediateFail=" + immediateFail
+                    + ", layout=" + layout + ", bufferSize=" + bufferSize + ", 
socketOptions=" + socketOptions + "]";
+        }
     }
 
     /**
      * Factory to create a TcpSocketManager.
      */
     protected static class TcpSocketManagerFactory implements 
ManagerFactory<TcpSocketManager, FactoryData> {
-        
+
         @SuppressWarnings("resource")
         @Override
         public TcpSocketManager createManager(final String name, final 
FactoryData data) {
@@ -403,7 +440,7 @@ public class TcpSocketManager extends AbstractSocketManager 
{
                         data.connectTimeoutMillis, data.reconnectDelayMillis, 
data.immediateFail, data.layout,
                         data.bufferSize, data.socketOptions);
             } catch (final IOException ex) {
-                LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex);
+                LOGGER.error("TcpSocketManager (" + name + ") caught exception 
and will continue:" + ex, ex);
                 os = NullOutputStream.getInstance();
             }
             if (data.reconnectDelayMillis == 0) {
@@ -440,4 +477,8 @@ public class TcpSocketManager extends AbstractSocketManager 
{
         return socket;
     }
 
+    public int getReconnectionDelayMillis() {
+        return reconnectionDelayMillis;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a5f3b03a/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index d8a6042..a3f6cdc 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -49,6 +49,9 @@
       <action issue="LOG4J2-1990" dev="ggregory" type="fix" due-to="Philippe 
Mouawad">
         ConcurrentModificationException logging a parameter of type Map.
       </action>
+      <action issue="LOG4J2-1311" dev="ggregory" type="fix" due-to="Xibing 
Liang">
+        SocketAppender will lose several events after re-connection to server.
+      </action>
       <action issue="LOG4J2-1977" dev="ggregory" type="fix" due-to="Jerry 
xnslong">
         Consider the StringBuilder's capacity instead of content length when 
trimming.
       </action>

Reply via email to