Repository: activemq Updated Branches: refs/heads/master c2ad0c325 -> 934a30a32
https://issues.apache.org/jira/browse/AMQ-6184 - improve nio transport scalability Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/934a30a3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/934a30a3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/934a30a3 Branch: refs/heads/master Commit: 934a30a327c46100224c6822be2d947c5d848afb Parents: c2ad0c3 Author: Dejan Bosanac <de...@nighttale.net> Authored: Thu Feb 25 12:07:41 2016 +0100 Committer: Dejan Bosanac <de...@nighttale.net> Committed: Thu Feb 25 12:07:51 2016 +0100 ---------------------------------------------------------------------- .../activemq/transport/nio/NIOTransport.java | 5 +- .../activemq/transport/nio/SelectorManager.java | 15 +--- .../transport/tcp/TcpTransportServer.java | 83 +++++++++++--------- 3 files changed, 53 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/934a30a3/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java index 0a15fa5..58ee1aa 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java @@ -92,7 +92,7 @@ public class NIOTransport extends TcpTransport { // Send the data via the channel // inputBuffer = ByteBuffer.allocateDirect(8*1024); - inputBuffer = ByteBuffer.allocate(getIoBufferSize()); + inputBuffer = ByteBuffer.allocateDirect(getIoBufferSize()); currentBuffer = inputBuffer; nextFrameSize = -1; currentBuffer.limit(4); @@ -120,7 +120,6 @@ public class NIOTransport extends TcpTransport { } this.receiveCounter += readSize; - if (currentBuffer.hasRemaining()) { continue; } @@ -143,7 +142,7 @@ public class NIOTransport extends TcpTransport { } if (nextFrameSize > inputBuffer.capacity()) { - currentBuffer = ByteBuffer.allocate(nextFrameSize); + currentBuffer = ByteBuffer.allocateDirect(nextFrameSize); currentBuffer.putInt(nextFrameSize); } else { inputBuffer.limit(nextFrameSize); http://git-wip-us.apache.org/repos/asf/activemq/blob/934a30a3/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index 1adb92f..bc50003 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -19,12 +19,7 @@ package org.apache.activemq.transport.nio; import java.io.IOException; import java.nio.channels.spi.AbstractSelectableChannel; import java.util.LinkedList; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * The SelectorManager will manage one Selector and the thread that checks the @@ -43,7 +38,7 @@ public final class SelectorManager { private int maxChannelsPerWorker = 1024; protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultMaximumPoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { private long i = 0; @@ -59,12 +54,8 @@ public final class SelectorManager { return rc; } - private static int getDefaultCorePoolSize() { - return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 0); - } - private static int getDefaultMaximumPoolSize() { - return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", Integer.MAX_VALUE); + return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", 1024); } private static int getDefaultKeepAliveTime() { http://git-wip-us.apache.org/repos/asf/activemq/blob/934a30a3/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 43e6ae0..f071522 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -27,9 +27,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -67,7 +70,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); protected ServerSocket serverSocket; - protected SelectorSelection selector; + protected Selector selector; protected int backlog = 5000; protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected final TcpTransportFactory transportFactory; @@ -303,46 +306,57 @@ public class TcpTransportServer extends TransportServerThreadSupport implements if (chan != null) { try { chan.configureBlocking(false); - selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() { - @Override - public void onSelect(SelectorSelection sel) { - try { - SocketChannel sc = chan.accept(); - if (sc != null) { - if (isStopped() || getAcceptListener() == null) { - sc.close(); - } else { - if (useQueueForAccept) { - socketQueue.put(sc.socket()); + selector = Selector.open(); + chan.register(selector, SelectionKey.OP_ACCEPT); + while (!isStopped()) { + int count = selector.select(10); + + if (count == 0) { + continue; + } + + Set<SelectionKey> keys = selector.selectedKeys(); + + for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { + final SelectionKey key = i.next(); + if (key.isAcceptable()) { + try { + SocketChannel sc = chan.accept(); + if (sc != null) { + if (isStopped() || getAcceptListener() == null) { + sc.close(); } else { - handleSocket(sc.socket()); + if (useQueueForAccept) { + socketQueue.put(sc.socket()); + } else { + handleSocket(sc.socket()); + } } } + + } catch (SocketTimeoutException ste) { + // expect this to happen + } catch (Exception e) { + e.printStackTrace(); + if (!isStopping()) { + onAcceptError(e); + } else if (!isStopped()) { + LOG.warn("run()", e); + onAcceptError(e); + } } - } catch (Exception e) { - onError(sel, e); - } - } - @Override - public void onError(SelectorSelection sel, Throwable error) { - Exception e = null; - if (error instanceof Exception) { - e = (Exception)error; - } else { - e = new Exception(error); - } - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); } + i.remove(); } - }); - selector.setInterestOps(SelectionKey.OP_ACCEPT); - selector.enable(); + + } } catch (IOException ex) { - selector = null; + if (selector != null) { + try { + selector.close(); + } catch (IOException ioe) {} + selector = null; + } } } else { while (!isStopped()) { @@ -459,7 +473,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements @Override protected void doStop(ServiceStopper stopper) throws Exception { if (selector != null) { - selector.disable(); selector.close(); selector = null; }