Repository: activemq Updated Branches: refs/heads/master d7b5a62bb -> ff9987226
https://issues.apache.org/jira/browse/AMQ-6209 Better management of shared resources between the background run thread and the main start / stop thread. Makes sure to cleanup all resources before finally throwing on stop to prevent leaking and resources. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ff998722 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ff998722 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ff998722 Branch: refs/heads/master Commit: ff99872263981982bb1ebce93c07bfb8a28d4a06 Parents: d7b5a62 Author: Timothy Bish <[email protected]> Authored: Fri May 13 11:13:11 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri May 13 11:13:28 2016 -0400 ---------------------------------------------------------------------- .../transport/tcp/TcpTransportServer.java | 214 ++++++++++++------- 1 file changed, 132 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ff998722/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index f071522..33d9c72 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -26,6 +26,7 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -50,8 +51,6 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerThreadSupport; -import org.apache.activemq.transport.nio.SelectorManager; -import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.IntrospectionSupport; @@ -69,8 +68,9 @@ import org.slf4j.LoggerFactory; public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); - protected ServerSocket serverSocket; - protected Selector selector; + + protected volatile ServerSocket serverSocket; + protected volatile Selector selector; protected int backlog = 5000; protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected final TcpTransportFactory transportFactory; @@ -113,14 +113,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements */ protected boolean startLogging = true; protected final ServerSocketFactory serverSocketFactory; - protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); + protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); protected Thread socketHandlerThread; /** * The maximum number of sockets allowed for this server */ protected int maximumConnections = Integer.MAX_VALUE; - protected AtomicInteger currentTransportCount = new AtomicInteger(); + protected final AtomicInteger currentTransportCount = new AtomicInteger(); public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { @@ -137,8 +137,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements InetAddress addr = InetAddress.getByName(host); try { - this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); - configureServerSocket(this.serverSocket); + serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); + configureServerSocket(serverSocket); } catch (IOException e) { throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); } @@ -146,7 +146,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); } catch (URISyntaxException e) { - // it could be that the host name contains invalid characters such // as _ on unix platforms so lets try use the IP address instead try { @@ -302,87 +301,114 @@ public class TcpTransportServer extends TransportServerThreadSupport implements */ @Override public void run() { - final ServerSocketChannel chan = serverSocket.getChannel(); - if (chan != null) { + if (!isStopped() && !isStopping()) { + final ServerSocket serverSocket = this.serverSocket; + if (serverSocket == null) { + onAcceptError(new IOException("Server started without a valid ServerSocket")); + } + + final ServerSocketChannel channel = serverSocket.getChannel(); + if (channel != null) { + doRunWithServerSocketChannel(channel); + } else { + doRunWithServerSocket(serverSocket); + } + } + } + + private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { + try { + channel.configureBlocking(false); + final Selector selector = Selector.open(); + try { - chan.configureBlocking(false); - selector = Selector.open(); - chan.register(selector, SelectionKey.OP_ACCEPT); - while (!isStopped()) { - int count = selector.select(10); - - if (count == 0) { - continue; - } + channel.register(selector, SelectionKey.OP_ACCEPT); + } catch (ClosedChannelException ex) { + try { + selector.close(); + } catch (IOException ignore) {} - Set<SelectionKey> keys = selector.selectedKeys(); + throw ex; + } + + // Update object instance for later cleanup. + this.selector = selector; + + while (!isStopped()) { + int count = selector.select(10); + + if (count == 0) { + continue; + } - for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { - final SelectionKey key = i.next(); - if (key.isAcceptable()) { - try { - SocketChannel sc = chan.accept(); - if (sc != null) { - if (isStopped() || getAcceptListener() == null) { - sc.close(); + Set<SelectionKey> keys = selector.selectedKeys(); + + for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { + final SelectionKey key = i.next(); + if (key.isAcceptable()) { + try { + SocketChannel sc = channel.accept(); + if (sc != null) { + if (isStopped() || getAcceptListener() == null) { + sc.close(); + } else { + if (useQueueForAccept) { + socketQueue.put(sc.socket()); } else { - if (useQueueForAccept) { - socketQueue.put(sc.socket()); - } else { - handleSocket(sc.socket()); - } + handleSocket(sc.socket()); } } + } - } catch (SocketTimeoutException ste) { - // expect this to happen - } catch (Exception e) { - e.printStackTrace(); - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); - } + } catch (SocketTimeoutException ste) { + // expect this to happen + } catch (Exception e) { + e.printStackTrace(); + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); } } - i.remove(); } - - } - } catch (IOException ex) { - if (selector != null) { - try { - selector.close(); - } catch (IOException ioe) {} - selector = null; + i.remove(); } } - } else { - while (!isStopped()) { - Socket socket = null; - try { - socket = serverSocket.accept(); - if (socket != null) { - if (isStopped() || getAcceptListener() == null) { - socket.close(); + } catch (IOException ex) { + if (!isStopping()) { + onAcceptError(ex); + } else if (!isStopped()) { + LOG.warn("run()", ex); + onAcceptError(ex); + } + } + } + + private void doRunWithServerSocket(final ServerSocket serverSocket) { + while (!isStopped()) { + Socket socket = null; + try { + socket = serverSocket.accept(); + if (socket != null) { + if (isStopped() || getAcceptListener() == null) { + socket.close(); + } else { + if (useQueueForAccept) { + socketQueue.put(socket); } else { - if (useQueueForAccept) { - socketQueue.put(socket); - } else { - handleSocket(socket); - } + handleSocket(socket); } } - } catch (SocketTimeoutException ste) { - // expect this to happen - } catch (Exception e) { - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); - } + } + } catch (SocketTimeoutException ste) { + // expect this to happen + } catch (Exception e) { + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); } } } @@ -472,19 +498,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements @Override protected void doStop(ServiceStopper stopper) throws Exception { - if (selector != null) { - selector.close(); - selector = null; + Exception firstFailure = null; + + try { + final Selector selector = this.selector; + if (selector != null) { + this.selector = null; + selector.close(); + } + } catch (Exception error) { } - if (serverSocket != null) { - serverSocket.close(); - serverSocket = null; + + try { + final ServerSocket serverSocket = this.serverSocket; + if (serverSocket != null) { + this.serverSocket = null; + serverSocket.close(); + } + } catch (Exception error) { + firstFailure = error; } + if (socketHandlerThread != null) { socketHandlerThread.interrupt(); socketHandlerThread = null; } - super.doStop(stopper); + + try { + super.doStop(stopper); + } catch (Exception error) { + if (firstFailure != null) { + firstFailure = error; + } + } + + if (firstFailure != null) { + throw firstFailure; + } } @Override
