Repository: activemq Updated Branches: refs/heads/activemq-5.13.x 3a4cdeb91 -> 52d2b1578
https://issues.apache.org/jira/browse/AMQ-6291 Better management of shared resources between the background run thread and the main start / stop thread. Makes sure to cleanup all resources before finally throwing on stop to prevent leaking and resources. (cherry picked from commit ff99872263981982bb1ebce93c07bfb8a28d4a06) Conflicts: activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52d2b157 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52d2b157 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52d2b157 Branch: refs/heads/activemq-5.13.x Commit: 52d2b157808c4dd11fe39203e0caad44d4e64a86 Parents: 3a4cdeb Author: Timothy Bish <[email protected]> Authored: Fri May 13 11:13:11 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri May 13 11:29:25 2016 -0400 ---------------------------------------------------------------------- .../transport/tcp/TcpTransportServer.java | 194 ++++++++++++------- 1 file changed, 127 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/52d2b157/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 43e6ae0..a4ed0ae 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -26,10 +26,14 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -47,8 +51,6 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerThreadSupport; -import org.apache.activemq.transport.nio.SelectorManager; -import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.IntrospectionSupport; @@ -66,8 +68,8 @@ import org.slf4j.LoggerFactory; public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); - protected ServerSocket serverSocket; - protected SelectorSelection selector; + protected volatile ServerSocket serverSocket; + protected volatile Selector selector; protected int backlog = 5000; protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected final TcpTransportFactory transportFactory; @@ -110,14 +112,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements */ protected boolean startLogging = true; protected final ServerSocketFactory serverSocketFactory; - protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); + protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); protected Thread socketHandlerThread; /** * The maximum number of sockets allowed for this server */ protected int maximumConnections = Integer.MAX_VALUE; - protected AtomicInteger currentTransportCount = new AtomicInteger(); + protected final AtomicInteger currentTransportCount = new AtomicInteger(); public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { @@ -134,8 +136,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements InetAddress addr = InetAddress.getByName(host); try { - this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); - configureServerSocket(this.serverSocket); + serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); + configureServerSocket(serverSocket); } catch (IOException e) { throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); } @@ -143,7 +145,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); } catch (URISyntaxException e) { - // it could be that the host name contains invalid characters such // as _ on unix platforms so lets try use the IP address instead try { @@ -299,15 +300,52 @@ public class TcpTransportServer extends TransportServerThreadSupport implements */ @Override public void run() { - final ServerSocketChannel chan = serverSocket.getChannel(); - if (chan != null) { + if (!isStopped() && !isStopping()) { + final ServerSocket serverSocket = this.serverSocket; + if (serverSocket == null) { + onAcceptError(new IOException("Server started without a valid ServerSocket")); + } + + final ServerSocketChannel channel = serverSocket.getChannel(); + if (channel != null) { + doRunWithServerSocketChannel(channel); + } else { + doRunWithServerSocket(serverSocket); + } + } + } + + private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { + try { + channel.configureBlocking(false); + final Selector selector = Selector.open(); + try { - chan.configureBlocking(false); - selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() { - @Override - public void onSelect(SelectorSelection sel) { + channel.register(selector, SelectionKey.OP_ACCEPT); + } catch (ClosedChannelException ex) { + try { + selector.close(); + } catch (IOException ignore) {} + + throw ex; + } + + // Update object instance for later cleanup. + this.selector = selector; + + while (!isStopped()) { + int count = selector.select(10); + if (count == 0) { + continue; + } + + Set<SelectionKey> keys = selector.selectedKeys(); + + for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { + final SelectionKey key = i.next(); + if (key.isAcceptable()) { try { - SocketChannel sc = chan.accept(); + SocketChannel sc = channel.accept(); if (sc != null) { if (isStopped() || getAcceptListener() == null) { sc.close(); @@ -319,56 +357,55 @@ public class TcpTransportServer extends TransportServerThreadSupport implements } } } + } catch (SocketTimeoutException ste) { + // expect this to happen } catch (Exception e) { - onError(sel, e); - } - } - @Override - public void onError(SelectorSelection sel, Throwable error) { - Exception e = null; - if (error instanceof Exception) { - e = (Exception)error; - } else { - e = new Exception(error); - } - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); + e.printStackTrace(); + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); + } } } - }); - selector.setInterestOps(SelectionKey.OP_ACCEPT); - selector.enable(); - } catch (IOException ex) { - selector = null; + i.remove(); + } } - } else { - while (!isStopped()) { - Socket socket = null; - try { - socket = serverSocket.accept(); - if (socket != null) { - if (isStopped() || getAcceptListener() == null) { - socket.close(); + } catch (IOException ex) { + if (!isStopping()) { + onAcceptError(ex); + } else if (!isStopped()) { + LOG.warn("run()", ex); + onAcceptError(ex); + } + } + } + + private void doRunWithServerSocket(final ServerSocket serverSocket) { + while (!isStopped()) { + Socket socket = null; + try { + socket = serverSocket.accept(); + if (socket != null) { + if (isStopped() || getAcceptListener() == null) { + socket.close(); + } else { + if (useQueueForAccept) { + socketQueue.put(socket); } else { - if (useQueueForAccept) { - socketQueue.put(socket); - } else { - handleSocket(socket); - } + handleSocket(socket); } } - } catch (SocketTimeoutException ste) { - // expect this to happen - } catch (Exception e) { - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); - } + } + } catch (SocketTimeoutException ste) { + // expect this to happen + } catch (Exception e) { + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); } } } @@ -458,20 +495,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements @Override protected void doStop(ServiceStopper stopper) throws Exception { - if (selector != null) { - selector.disable(); - selector.close(); - selector = null; + + Exception firstFailure = null; + + try { + if (selector != null) { + selector.close(); + selector = null; + } + } catch (Exception error) { } - if (serverSocket != null) { - serverSocket.close(); - serverSocket = null; + + try { + final ServerSocket serverSocket = this.serverSocket; + if (serverSocket != null) { + this.serverSocket = null; + serverSocket.close(); + } + } catch (Exception error) { + firstFailure = error; } + if (socketHandlerThread != null) { socketHandlerThread.interrupt(); socketHandlerThread = null; } - super.doStop(stopper); + + try { + super.doStop(stopper); + } catch (Exception error) { + if (firstFailure != null) { + firstFailure = error; + } + } + + if (firstFailure != null) { + throw firstFailure; + } } @Override
