Repository: activemq
Updated Branches:
  refs/heads/master c2ad0c325 -> 934a30a32


https://issues.apache.org/jira/browse/AMQ-6184 - improve nio transport 
scalability


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/934a30a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/934a30a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/934a30a3

Branch: refs/heads/master
Commit: 934a30a327c46100224c6822be2d947c5d848afb
Parents: c2ad0c3
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Feb 25 12:07:41 2016 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Feb 25 12:07:51 2016 +0100

----------------------------------------------------------------------
 .../activemq/transport/nio/NIOTransport.java    |  5 +-
 .../activemq/transport/nio/SelectorManager.java | 15 +---
 .../transport/tcp/TcpTransportServer.java       | 83 +++++++++++---------
 3 files changed, 53 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/934a30a3/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 0a15fa5..58ee1aa 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -92,7 +92,7 @@ public class NIOTransport extends TcpTransport {
 
         // Send the data via the channel
         // inputBuffer = ByteBuffer.allocateDirect(8*1024);
-        inputBuffer = ByteBuffer.allocate(getIoBufferSize());
+        inputBuffer = ByteBuffer.allocateDirect(getIoBufferSize());
         currentBuffer = inputBuffer;
         nextFrameSize = -1;
         currentBuffer.limit(4);
@@ -120,7 +120,6 @@ public class NIOTransport extends TcpTransport {
                 }
 
                 this.receiveCounter += readSize;
-
                 if (currentBuffer.hasRemaining()) {
                     continue;
                 }
@@ -143,7 +142,7 @@ public class NIOTransport extends TcpTransport {
                     }
 
                     if (nextFrameSize > inputBuffer.capacity()) {
-                        currentBuffer = ByteBuffer.allocate(nextFrameSize);
+                        currentBuffer = 
ByteBuffer.allocateDirect(nextFrameSize);
                         currentBuffer.putInt(nextFrameSize);
                     } else {
                         inputBuffer.limit(nextFrameSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/934a30a3/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
index 1adb92f..bc50003 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
@@ -19,12 +19,7 @@ package org.apache.activemq.transport.nio;
 import java.io.IOException;
 import java.nio.channels.spi.AbstractSelectableChannel;
 import java.util.LinkedList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * The SelectorManager will manage one Selector and the thread that checks the
@@ -43,7 +38,7 @@ public final class SelectorManager {
     private int maxChannelsPerWorker = 1024;
 
     protected ExecutorService createDefaultExecutor() {
-        ThreadPoolExecutor rc = new 
ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), 
getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        ThreadPoolExecutor rc = new 
ThreadPoolExecutor(getDefaultMaximumPoolSize(), getDefaultMaximumPoolSize(), 
getDefaultKeepAliveTime(), TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(),
             new ThreadFactory() {
 
                 private long i = 0;
@@ -59,12 +54,8 @@ public final class SelectorManager {
         return rc;
     }
 
-    private static int getDefaultCorePoolSize() {
-        return 
Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize",
 0);
-    }
-
     private static int getDefaultMaximumPoolSize() {
-        return 
Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize",
 Integer.MAX_VALUE);
+        return 
Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize",
 1024);
     }
 
     private static int getDefaultKeepAliveTime() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/934a30a3/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 43e6ae0..f071522 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -27,9 +27,12 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -67,7 +70,7 @@ public class TcpTransportServer extends 
TransportServerThreadSupport implements
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TcpTransportServer.class);
     protected ServerSocket serverSocket;
-    protected SelectorSelection selector;
+    protected Selector selector;
     protected int backlog = 5000;
     protected WireFormatFactory wireFormatFactory = new 
OpenWireFormatFactory();
     protected final TcpTransportFactory transportFactory;
@@ -303,46 +306,57 @@ public class TcpTransportServer extends 
TransportServerThreadSupport implements
         if (chan != null) {
             try {
                 chan.configureBlocking(false);
-                selector = SelectorManager.getInstance().register(chan, new 
SelectorManager.Listener() {
-                    @Override
-                    public void onSelect(SelectorSelection sel) {
-                        try {
-                            SocketChannel sc = chan.accept();
-                            if (sc != null) {
-                                if (isStopped() || getAcceptListener() == 
null) {
-                                    sc.close();
-                                } else {
-                                    if (useQueueForAccept) {
-                                        socketQueue.put(sc.socket());
+                selector = Selector.open();
+                chan.register(selector, SelectionKey.OP_ACCEPT);
+                while (!isStopped()) {
+                    int count = selector.select(10);
+
+                    if (count == 0) {
+                        continue;
+                    }
+
+                    Set<SelectionKey> keys = selector.selectedKeys();
+
+                    for (Iterator<SelectionKey> i = keys.iterator(); 
i.hasNext(); ) {
+                        final SelectionKey key = i.next();
+                        if (key.isAcceptable()) {
+                            try {
+                                SocketChannel sc = chan.accept();
+                                if (sc != null) {
+                                    if (isStopped() || getAcceptListener() == 
null) {
+                                        sc.close();
                                     } else {
-                                        handleSocket(sc.socket());
+                                        if (useQueueForAccept) {
+                                            socketQueue.put(sc.socket());
+                                        } else {
+                                            handleSocket(sc.socket());
+                                        }
                                     }
                                 }
+
+                            } catch (SocketTimeoutException ste) {
+                                // expect this to happen
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                                if (!isStopping()) {
+                                    onAcceptError(e);
+                                } else if (!isStopped()) {
+                                    LOG.warn("run()", e);
+                                    onAcceptError(e);
+                                }
                             }
-                        } catch (Exception e) {
-                            onError(sel, e);
-                        }
-                    }
-                    @Override
-                    public void onError(SelectorSelection sel, Throwable 
error) {
-                        Exception e = null;
-                        if (error instanceof Exception) {
-                            e = (Exception)error;
-                        } else {
-                            e = new Exception(error);
-                        }
-                        if (!isStopping()) {
-                            onAcceptError(e);
-                        } else if (!isStopped()) {
-                            LOG.warn("run()", e);
-                            onAcceptError(e);
                         }
+                        i.remove();
                     }
-                });
-                selector.setInterestOps(SelectionKey.OP_ACCEPT);
-                selector.enable();
+
+                }
             } catch (IOException ex) {
-                selector = null;
+                if (selector != null) {
+                    try {
+                        selector.close();
+                    } catch (IOException ioe) {}
+                    selector = null;
+                }
             }
         } else {
             while (!isStopped()) {
@@ -459,7 +473,6 @@ public class TcpTransportServer extends 
TransportServerThreadSupport implements
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (selector != null) {
-            selector.disable();
             selector.close();
             selector = null;
         }

Reply via email to