Repository: activemq
Updated Branches:
  refs/heads/master e5a94bfee -> e14aca871


http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
new file mode 100644
index 0000000..dd2655e
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.protocol.AmqpProtocolVerifier;
+import org.apache.activemq.transport.protocol.MqttProtocolVerifier;
+import org.apache.activemq.transport.protocol.OpenWireProtocolVerifier;
+import org.apache.activemq.transport.protocol.ProtocolVerifier;
+import org.apache.activemq.transport.protocol.StompProtocolVerifier;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TCP based implementation of {@link TransportServer}
+ */
+public class AutoTcpTransportServer extends TcpTransportServer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoTcpTransportServer.class);
+
+    protected Map<String, Map<String, Object>> wireFormatOptions;
+    protected Map<String, Object> autoTransportOptions;
+    protected Set<String> enabledProtocols;
+    protected final Map<String, ProtocolVerifier> protocolVerifiers = new 
ConcurrentHashMap<String, ProtocolVerifier>();
+
+    protected BrokerService brokerService;
+
+    protected int maxConnectionThreadPoolSize = Integer.MAX_VALUE;
+    protected int protocolDetectionTimeOut = 30000;
+
+    private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new 
FactoryFinder("META-INF/services/org/apache/activemq/transport/");
+    private final ConcurrentMap<String, TransportFactory> transportFactories = 
new ConcurrentHashMap<String, TransportFactory>();
+
+    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new 
FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
+
+    public WireFormatFactory findWireFormatFactory(String scheme, Map<String, 
Map<String, Object>> options) throws IOException {
+        WireFormatFactory wff = null;
+        try {
+            wff = 
(WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(scheme);
+            if (options != null) {
+                IntrospectionSupport.setProperties(wff, 
options.get(AutoTransportUtils.ALL));
+                IntrospectionSupport.setProperties(wff, options.get(scheme));
+            }
+            if (wff instanceof OpenWireFormatFactory) {
+                protocolVerifiers.put(AutoTransportUtils.OPENWIRE, new 
OpenWireProtocolVerifier((OpenWireFormatFactory) wff));
+            }
+            return wff;
+        } catch (Throwable e) {
+           throw IOExceptionSupport.create("Could not create wire format 
factory for: " + scheme + ", reason: " + e, e);
+        }
+    }
+
+    public TransportFactory findTransportFactory(String scheme, Map<String, ?> 
options) throws IOException {
+        scheme = append(scheme, "nio");
+        scheme = append(scheme, "ssl");
+
+        if (scheme.isEmpty()) {
+            scheme = "tcp";
+        }
+
+        TransportFactory tf = transportFactories.get(scheme);
+        if (tf == null) {
+            // Try to load if from a META-INF property.
+            try {
+                tf = 
(TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
+                if (options != null) {
+                    IntrospectionSupport.setProperties(tf, options);
+                }
+                transportFactories.put(scheme, tf);
+            } catch (Throwable e) {
+                throw IOExceptionSupport.create("Transport scheme NOT 
recognized: [" + scheme + "]", e);
+            }
+        }
+        return tf;
+    }
+
+    protected String append(String currentScheme, String scheme) {
+        if (this.getBindLocation().getScheme().contains(scheme)) {
+            if (!currentScheme.isEmpty()) {
+                currentScheme += "+";
+            }
+            currentScheme += scheme;
+        }
+        return currentScheme;
+    }
+
+    /**
+     * @param transportFactory
+     * @param location
+     * @param serverSocketFactory
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    public AutoTcpTransportServer(TcpTransportFactory transportFactory,
+            URI location, ServerSocketFactory serverSocketFactory, 
BrokerService brokerService,
+            Set<String> enabledProtocols)
+            throws IOException, URISyntaxException {
+        super(transportFactory, location, serverSocketFactory);
+
+        //Use an executor service here to handle new connections.  Setting the 
max number
+        //of threads to the maximum number of connections the thread count 
isn't unbounded
+        service = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
+                maxConnectionThreadPoolSize,
+                30L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>());
+        //allow the thread pool to shrink if the max number of threads isn't 
needed
+        service.allowCoreThreadTimeOut(true);
+
+        this.brokerService = brokerService;
+        this.enabledProtocols = enabledProtocols;
+        initProtocolVerifiers();
+    }
+
+    public int getMaxConnectionThreadPoolSize() {
+        return maxConnectionThreadPoolSize;
+    }
+
+    public void setMaxConnectionThreadPoolSize(int 
maxConnectionThreadPoolSize) {
+        this.maxConnectionThreadPoolSize = maxConnectionThreadPoolSize;
+        service.setCorePoolSize(maxConnectionThreadPoolSize);
+        service.setMaximumPoolSize(maxConnectionThreadPoolSize);
+    }
+
+    public void setProtocolDetectionTimeOut(int protocolDetectionTimeOut) {
+        this.protocolDetectionTimeOut = protocolDetectionTimeOut;
+    }
+
+    @Override
+    public void setWireFormatFactory(WireFormatFactory factory) {
+        super.setWireFormatFactory(factory);
+        initOpenWireProtocolVerifier();
+    }
+
+    protected void initProtocolVerifiers() {
+        initOpenWireProtocolVerifier();
+
+        if (isAllProtocols() || 
enabledProtocols.contains(AutoTransportUtils.AMQP)) {
+            protocolVerifiers.put(AutoTransportUtils.AMQP, new 
AmqpProtocolVerifier());
+        }
+        if (isAllProtocols() || 
enabledProtocols.contains(AutoTransportUtils.STOMP)) {
+            protocolVerifiers.put(AutoTransportUtils.STOMP, new 
StompProtocolVerifier());
+        }
+        if (isAllProtocols()|| 
enabledProtocols.contains(AutoTransportUtils.MQTT)) {
+            protocolVerifiers.put(AutoTransportUtils.MQTT, new 
MqttProtocolVerifier());
+        }
+    }
+
+    protected void initOpenWireProtocolVerifier() {
+        if (isAllProtocols() || 
enabledProtocols.contains(AutoTransportUtils.OPENWIRE)) {
+            OpenWireProtocolVerifier owpv;
+            if (wireFormatFactory instanceof OpenWireFormatFactory) {
+                owpv = new OpenWireProtocolVerifier((OpenWireFormatFactory) 
wireFormatFactory);
+            } else {
+                owpv = new OpenWireProtocolVerifier(new 
OpenWireFormatFactory());
+            }
+            protocolVerifiers.put(AutoTransportUtils.OPENWIRE, owpv);
+        }
+    }
+
+    protected boolean isAllProtocols() {
+        return enabledProtocols == null || enabledProtocols.isEmpty();
+    }
+
+
+    protected final ThreadPoolExecutor service;
+
+
+    /**
+     * This holds the initial buffer that has been read to detect the protocol.
+     */
+    public InitBuffer initBuffer;
+
+    @Override
+    protected void handleSocket(final Socket socket) {
+        final AutoTcpTransportServer server = this;
+        //This needs to be done in a new thread because
+        //the socket might be waiting on the client to send bytes
+        //doHandleSocket can't complete until the protocol can be detected
+        service.submit(new Runnable() {
+            @Override
+            public void run() {
+                server.doHandleSocket(socket);
+            }
+        });
+    }
+
+    @Override
+    protected TransportInfo configureTransport(final TcpTransportServer 
server, final Socket socket) throws Exception {
+        final InputStream is = socket.getInputStream();
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        final AtomicInteger readBytes = new AtomicInteger(0);
+        final ByteBuffer data = ByteBuffer.allocate(8);
+        // We need to peak at the first 8 bytes of the buffer to detect the 
protocol
+        Future<?> future = executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    do {
+                        int read = is.read();
+                        if (read == -1) {
+                            throw new IOException("Connection faild, stream is 
closed.");
+                        }
+                        data.put((byte) read);
+                        readBytes.incrementAndGet();
+                    } while (readBytes.get() < 8);
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        });
+
+        try {
+            //Wait for protocolDetectionTimeOut if defined
+            if (protocolDetectionTimeOut > 0) {
+                future.get(protocolDetectionTimeOut, TimeUnit.MILLISECONDS);
+            } else {
+                future.get();
+            }
+            data.flip();
+        } catch (TimeoutException e) {
+            throw new InactivityIOException("Client timed out before wire 
format could be detected. " +
+                    " 8 bytes are required to detect the protocol but only: " 
+ readBytes + " were sent.");
+        }
+
+        ProtocolInfo protocolInfo = detectProtocol(data.array());
+
+        initBuffer = new InitBuffer(readBytes.get(), 
ByteBuffer.allocate(readBytes.get()));
+        initBuffer.buffer.put(data.array());
+
+        if (protocolInfo.detectedTransportFactory instanceof 
BrokerServiceAware) {
+            ((BrokerServiceAware) 
protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
+        }
+
+        WireFormat format = 
protocolInfo.detectedWireFormatFactory.createWireFormat();
+        Transport transport = createTransport(socket, 
format,protocolInfo.detectedTransportFactory);
+
+        return new TransportInfo(format, transport, 
protocolInfo.detectedTransportFactory);
+    }
+
+    @Override
+    protected TcpTransport createTransport(Socket socket, WireFormat format) 
throws IOException {
+        return new TcpTransport(format, socket, this.initBuffer);
+    }
+
+    /**
+     * @param socket
+     * @param format
+     * @param detectedTransportFactory
+     * @return
+     */
+    protected TcpTransport createTransport(Socket socket, WireFormat format,
+            TcpTransportFactory detectedTransportFactory) throws IOException {
+        return createTransport(socket, format);
+    }
+
+    public void setWireFormatOptions(Map<String, Map<String, Object>> 
wireFormatOptions) {
+        this.wireFormatOptions = wireFormatOptions;
+    }
+
+    public void setEnabledProtocols(Set<String> enabledProtocols) {
+        this.enabledProtocols = enabledProtocols;
+    }
+
+    public void setAutoTransportOptions(Map<String, Object> 
autoTransportOptions) {
+        this.autoTransportOptions = autoTransportOptions;
+        if (autoTransportOptions.get("protocols") != null) {
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoTransportOptions.get("protocols"));
+        }
+    }
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        if (service != null) {
+            service.shutdown();
+        }
+        super.doStop(stopper);
+    }
+
+    protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException {
+        TcpTransportFactory detectedTransportFactory = transportFactory;
+        WireFormatFactory detectedWireFormatFactory = wireFormatFactory;
+
+        boolean found = false;
+        for (String scheme : protocolVerifiers.keySet()) {
+            if (protocolVerifiers.get(scheme).isProtocol(buffer)) {
+                LOG.debug("Detected " + scheme);
+                detectedWireFormatFactory = findWireFormatFactory(scheme, 
wireFormatOptions);
+
+                if (scheme.equals("default")) {
+                    scheme = "";
+                }
+
+                detectedTransportFactory = (TcpTransportFactory) 
findTransportFactory(scheme, transportOptions);
+                found = true;
+                break;
+            }
+        }
+
+        if (!found) {
+            throw new IllegalStateException("Could not detect wire format");
+        }
+
+        return new ProtocolInfo(detectedTransportFactory, 
detectedWireFormatFactory);
+
+    }
+
+    protected class ProtocolInfo {
+        public final TcpTransportFactory detectedTransportFactory;
+        public final WireFormatFactory detectedWireFormatFactory;
+
+        public ProtocolInfo(TcpTransportFactory detectedTransportFactory,
+                WireFormatFactory detectedWireFormatFactory) {
+            super();
+            this.detectedTransportFactory = detectedTransportFactory;
+            this.detectedWireFormatFactory = detectedWireFormatFactory;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java
new file mode 100644
index 0000000..14823db
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ *
+ *
+ */
+public class AutoTransportUtils {
+
+    //wireformats
+    public static String ALL = "all";
+    public static String OPENWIRE = "default";
+    public static String STOMP = "stomp";
+    public static String AMQP = "amqp";
+    public static String MQTT = "mqtt";
+
+    //transports
+    public static String AUTO = "auto";
+
+    public static Map<String, Map<String, Object>> 
extractWireFormatOptions(Map<String, String> options ) {
+        Map<String, Map<String, Object>> wireFormatOptions = new HashMap<>();
+        if (options != null) {
+            wireFormatOptions.put(OPENWIRE, 
IntrospectionSupport.extractProperties(options, "wireFormat.default."));
+            wireFormatOptions.put(STOMP, 
IntrospectionSupport.extractProperties(options, "wireFormat.stomp."));
+            wireFormatOptions.put(AMQP, 
IntrospectionSupport.extractProperties(options, "wireFormat.amqp."));
+            wireFormatOptions.put(MQTT, 
IntrospectionSupport.extractProperties(options, "wireFormat.mqtt."));
+            wireFormatOptions.put(ALL, 
IntrospectionSupport.extractProperties(options, "wireFormat."));
+        }
+        return wireFormatOptions;
+    }
+
+    public static Set<String> parseProtocols(String protocolString) {
+        Set<String> protocolSet = new HashSet<>();;
+        if (protocolString != null && !protocolString.isEmpty()) {
+            protocolSet.addAll(Arrays.asList(protocolString.split(",")));
+        }
+        return protocolSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
new file mode 100644
index 0000000..a04bc6e
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
@@ -0,0 +1,159 @@
+package org.apache.activemq.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.transport.nio.AutoInitNioSSLTransport;
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoNIOSSLTransportServer.class);
+
+    private SSLContext context;
+
+    public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory 
transportFactory, URI location, ServerSocketFactory serverSocketFactory,
+            BrokerService brokerService, Set<String> enabledProtocols) throws 
IOException, URISyntaxException {
+        super(transportFactory, location, serverSocketFactory, brokerService, 
enabledProtocols);
+
+        this.context = context;
+    }
+
+    private boolean needClientAuth;
+    private boolean wantClientAuth;
+
+    protected Transport createTransport(Socket socket, WireFormat format, 
SSLEngine engine,
+            InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory 
detectedFactory) throws IOException {
+        NIOSSLTransport transport = new NIOSSLTransport(format, socket, 
engine, initBuffer, inputBuffer);
+        if (context != null) {
+            transport.setSslContext(context);
+        }
+
+        transport.setNeedClientAuth(needClientAuth);
+        transport.setWantClientAuth(wantClientAuth);
+
+
+        return transport;
+    }
+
+    @Override
+    protected TcpTransport createTransport(Socket socket, WireFormat format) 
throws IOException {
+        throw new UnsupportedOperationException("method not supported");
+    }
+
+    @Override
+    public boolean isSslServer() {
+        return true;
+    }
+
+    public boolean isNeedClientAuth() {
+        return this.needClientAuth;
+    }
+
+    public void setNeedClientAuth(boolean value) {
+        this.needClientAuth = value;
+    }
+
+    public boolean isWantClientAuth() {
+        return this.wantClientAuth;
+    }
+
+    public void setWantClientAuth(boolean value) {
+        this.wantClientAuth = value;
+    }
+
+
+    @Override
+    protected TransportInfo configureTransport(final TcpTransportServer 
server, final Socket socket) throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        //The SSLEngine needs to be initialized and handshake done to get the 
first command and detect the format
+        final AutoInitNioSSLTransport in = new 
AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
+        if (context != null) {
+            in.setSslContext(context);
+        }
+        in.start();
+        SSLEngine engine = in.getSslSession();
+
+        Future<Integer> future = executor.submit(new Callable<Integer>() {
+            @Override
+            public Integer call() throws Exception {
+                //Wait for handshake to finish initializing
+                do {
+                    in.serviceRead();
+                } while(in.readSize < 8);
+
+                return in.readSize;
+            }
+        });
+
+        try {
+            future.get(protocolDetectionTimeOut, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            throw new InactivityIOException("Client timed out before wire 
format could be detected. " +
+                    " 8 bytes are required to detect the protocol but only: " 
+ in.readSize + " were sent.");
+        }
+
+        in.stop();
+
+        initBuffer = new InitBuffer(in.readSize, 
ByteBuffer.allocate(in.read.length));
+        initBuffer.buffer.put(in.read);
+
+        ProtocolInfo protocolInfo = detectProtocol(in.read);
+
+        if (protocolInfo.detectedTransportFactory instanceof 
BrokerServiceAware) {
+            ((BrokerServiceAware) 
protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
+        }
+
+        WireFormat format = 
protocolInfo.detectedWireFormatFactory.createWireFormat();
+        Transport transport = createTransport(socket, format, engine, 
initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory);
+
+        return new TransportInfo(format, transport, 
protocolInfo.detectedTransportFactory);
+    }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java
new file mode 100644
index 0000000..a9a9f5d
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ *
+ */
+public class AutoNIOTransport extends NIOTransport {
+
+    public AutoNIOTransport(WireFormat format, Socket socket,
+            InitBuffer initBuffer) throws IOException {
+        super(format, socket, initBuffer);
+    }
+
+    public AutoNIOTransport(WireFormat wireFormat, Socket socket)
+            throws IOException {
+        super(wireFormat, socket);
+    }
+
+    public AutoNIOTransport(WireFormat wireFormat, SocketFactory socketFactory,
+            URI remoteLocation, URI localLocation) throws UnknownHostException,
+            IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+
+    boolean doneInitBuffer = false;
+
+    /**
+     * Read from the initial buffer if it is set
+     */
+    @Override
+    protected int readFromBuffer() throws IOException {
+        int readSize = 0;
+        if (!doneInitBuffer) {
+            if (initBuffer == null || initBuffer.readSize < 8) {
+                throw new IOException("Protocol type could not be 
determined.");
+            }
+            if (nextFrameSize == -1) {
+                readSize = 4;
+                this.initBuffer.buffer.flip();
+                for (int i = 0; i < 4; i++) {
+                    currentBuffer.put(initBuffer.buffer.get());
+                }
+            } else {
+                for (int i = 0; i < 4; i++) {
+                    currentBuffer.put(initBuffer.buffer.get());
+                }
+                readSize = 4;
+                doneInitBuffer = true;
+            }
+
+        } else {
+            readSize += channel.read(currentBuffer);
+        }
+        return readSize;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java
new file mode 100644
index 0000000..8a29ab2
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.transport.auto.AutoTransportUtils;
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.transport.nio.NIOSSLTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ *
+ */
+public class AutoNioSslTransportFactory extends NIOSSLTransportFactory 
implements BrokerServiceAware {
+    protected BrokerService brokerService;
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+     */
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    protected AutoNIOSSLTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
+        return new AutoNIOSSLTransportServer(context, this, location, 
serverSocketFactory, brokerService, enabledProtocols) {
+
+            @Override
+            protected Transport createTransport(Socket socket, WireFormat 
format, SSLEngine engine, InitBuffer initBuffer,
+                     ByteBuffer inputBuffer, TcpTransportFactory 
detectedFactory) throws IOException {
+                NIOSSLTransport nioSslTransport = (NIOSSLTransport) 
detectedFactory.createTransport(
+                        format, socket, engine, initBuffer, inputBuffer);
+
+                if (format.getClass().toString().contains("MQTT")) {
+                    if (!allowLinkStealingSet) {
+                        this.setAllowLinkStealing(true);
+                    }
+                }
+
+                if (context != null) {
+                    nioSslTransport.setSslContext(context);
+                }
+
+                nioSslTransport.setNeedClientAuth(isNeedClientAuth());
+                nioSslTransport.setWantClientAuth(isWantClientAuth());
+
+                return nioSslTransport;
+            }
+
+        };
+
+    }
+
+    boolean allowLinkStealingSet = false;
+    private Set<String> enabledProtocols;
+
+    @Override
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            if (SslContext.getCurrentSslContext() != null) {
+                try {
+                    context = 
SslContext.getCurrentSslContext().getSSLContext();
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParameters(location));
+
+            Map<String, Object> autoProperties = 
IntrospectionSupport.extractProperties(options, "auto.");
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoProperties.get("protocols"));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            AutoTcpTransportServer server = createTcpTransportServer(location, 
serverSocketFactory);
+            server.setWireFormatFactory(new OpenWireFormatFactory());
+            if (options.get("allowLinkStealing") != null){
+                allowLinkStealingSet = true;
+            }
+            IntrospectionSupport.setProperties(server, options);
+            
server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, 
"auto."));
+            
server.setTransportOption(IntrospectionSupport.extractProperties(options, 
"transport."));
+            
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
new file mode 100644
index 0000000..52244ff
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.transport.auto.AutoTransportUtils;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ *
+ */
+public class AutoNioTransportFactory extends NIOTransportFactory implements 
BrokerServiceAware {
+    protected BrokerService brokerService;
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+     */
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    protected AutoTcpTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
+        return new AutoTcpTransportServer(this, location, serverSocketFactory, 
brokerService, enabledProtocols) {
+            @Override
+            protected TcpTransport createTransport(Socket socket, WireFormat 
format, TcpTransportFactory detectedTransportFactory) throws IOException {
+                TcpTransport nioTransport = null;
+                if 
(detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) {
+                    nioTransport = new AutoNIOTransport(format, 
socket,this.initBuffer);
+                } else {
+                    nioTransport = detectedTransportFactory.createTransport(
+                            format, socket, this.initBuffer);
+                }
+
+                if (format.getClass().toString().contains("MQTT")) {
+                    if (!allowLinkStealingSet) {
+                        this.setAllowLinkStealing(true);
+                    }
+                }
+
+                return nioTransport;
+            }
+        };
+
+    }
+
+    boolean allowLinkStealingSet = false;
+    private Set<String> enabledProtocols;
+
+    @Override
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParameters(location));
+
+            Map<String, Object> autoProperties = 
IntrospectionSupport.extractProperties(options, "auto.");
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoProperties.get("protocols"));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            AutoTcpTransportServer server = createTcpTransportServer(location, 
serverSocketFactory);
+            //server.setWireFormatFactory(createWireFormatFactory(options));
+            server.setWireFormatFactory(new OpenWireFormatFactory());
+            if (options.get("allowLinkStealing") != null){
+                allowLinkStealingSet = true;
+            }
+            IntrospectionSupport.setProperties(server, options);
+            
server.setTransportOption(IntrospectionSupport.extractProperties(options, 
"transport."));
+            
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
index f922e98..000ec41 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
@@ -172,17 +172,11 @@ public class AutoInitNioSSLTransport extends 
NIOSSLTransport {
 
             while (true) {
                 if (!plain.hasRemaining()) {
-
                     int readCount = secureRead(plain);
 
-                    if (readCount == 0) {
-                        break;
-                    }
-
                     // channel is closed, cleanup
                     if (readCount == -1) {
                         onException(new EOFException());
-                        selection.close();
                         break;
                     }
 
@@ -191,8 +185,11 @@ public class AutoInitNioSSLTransport extends 
NIOSSLTransport {
 
                 if (status == SSLEngineResult.Status.OK && handshakeStatus != 
SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
                     processCommand(plain);
-                    //Break when command is found
-                    break;
+                    //we have received enough bytes to detect the protocol
+                    if (receiveCounter >= 8) {
+                        readSize = receiveCounter;
+                        break;
+                    }
                 }
             }
         } catch (IOException e) {
@@ -204,8 +201,13 @@ public class AutoInitNioSSLTransport extends 
NIOSSLTransport {
 
     @Override
     protected void processCommand(ByteBuffer plain) throws Exception {
-        read = plain.array();
-        readSize = receiveCounter;
+        ByteBuffer newBuffer = ByteBuffer.allocate(receiveCounter);
+        if (read != null) {
+            newBuffer.put(read);
+        }
+        newBuffer.put(plain);
+        newBuffer.flip();
+        read = newBuffer.array();
     }
 
 
@@ -214,7 +216,6 @@ public class AutoInitNioSSLTransport extends 
NIOSSLTransport {
         taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport 
Task");
         // no need to init as we can delay that until demand (eg in 
doHandshake)
         connect();
-        //super.doStart();
     }
 
 
@@ -224,10 +225,6 @@ public class AutoInitNioSSLTransport extends 
NIOSSLTransport {
             taskRunnerFactory.shutdownNow();
             taskRunnerFactory = null;
         }
-//        if (selection != null) {
-//            selection.close();
-//            selection = null;
-//        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java
new file mode 100644
index 0000000..fa6d6c6
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.protocol;
+
+
+/**
+ *
+ *
+ */
+public class AmqpProtocolVerifier implements ProtocolVerifier {
+
+    static final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' };
+
+    @Override
+    public boolean isProtocol(byte[] value) {
+        for (int i = 0; i < PREFIX.length; i++) {
+            if (value[i] != PREFIX[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java
new file mode 100644
index 0000000..e989f7e
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.protocol;
+
+/**
+ *
+ *
+ */
+public class MqttProtocolVerifier implements ProtocolVerifier {
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+     */
+    @Override
+    public boolean isProtocol(byte[] value) {
+        boolean mqtt311 = value[4] == 77 && // M
+                value[5] == 81 && // Q
+                value[6] == 84 && // T
+                value[7] == 84;   // T
+
+        boolean mqtt31  = value[4] == 77  && // M
+                        value[5] == 81  && // Q
+                        value[6] == 73  && // I
+                        value[7] == 115;   // s
+
+        return mqtt311 || mqtt31;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java
new file mode 100644
index 0000000..71277c2
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.protocol;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+
+/**
+ *
+ *
+ */
+public class OpenWireProtocolVerifier  implements ProtocolVerifier {
+
+    protected final OpenWireFormatFactory wireFormatFactory;
+
+    public OpenWireProtocolVerifier(OpenWireFormatFactory wireFormatFactory) {
+        this.wireFormatFactory = wireFormatFactory;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+     */
+    @Override
+    public boolean isProtocol(byte[] value) {
+        if (value.length < 8) {
+           throw new IllegalArgumentException("Protocol header length changed "
+                                                 + value.length);
+        }
+
+        int start = 
!((OpenWireFormat)wireFormatFactory.createWireFormat()).isSizePrefixDisabled() 
? 4 : 0;
+        int j = 0;
+        // type
+        if (value[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) {
+           return false;
+        }
+        start++;
+        WireFormatInfo info = new WireFormatInfo();
+        final byte[] magic = info.getMagic();
+        int remainingLen = value.length - start;
+        int useLen = remainingLen > magic.length ? magic.length : remainingLen;
+        useLen += start;
+        // magic
+        for (int i = start; i < useLen; i++) {
+           if (value[i] != magic[j]) {
+              return false;
+           }
+           j++;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java
new file mode 100644
index 0000000..8bb2399
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.protocol;
+
+
+public interface ProtocolVerifier {
+
+    public boolean isProtocol(byte[] value);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java
new file mode 100644
index 0000000..5e7275d
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.protocol;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ *
+ *
+ */
+public class StompProtocolVerifier implements ProtocolVerifier {
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+     */
+    @Override
+    public boolean isProtocol(byte[] value) {
+        String frameStart = new String(value, StandardCharsets.US_ASCII);
+        return frameStart.startsWith("CONNECT") || 
frameStart.startsWith("STOMP");
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
index d6791eb..3af6c99 100644
--- 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
+++ 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.transport.auto.AutoTcpTransportFactory
\ No newline at end of file
+class=org.apache.activemq.transport.auto.AutoTcpTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
index a03cc27..d5d7802 100644
--- 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
+++ 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.transport.auto.nio.AutoNioTransportFactory
\ No newline at end of file
+class=org.apache.activemq.transport.auto.nio.AutoNioTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
index 74a08c1..29972e3 100644
--- 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
+++ 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.transport.auto.nio.AutoNioSslTransportFactory
\ No newline at end of file
+class=org.apache.activemq.transport.auto.nio.AutoNioSslTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
index d6e626e..23d1099 100644
--- 
a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
+++ 
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.transport.auto.AutoSslTransportFactory
\ No newline at end of file
+class=org.apache.activemq.transport.auto.AutoSslTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index eeb68d8..97148ac 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -73,8 +73,9 @@ public class NIOSSLTransport extends NIOTransport {
             ByteBuffer inputBuffer) throws IOException {
         super(wireFormat, socket, initBuffer);
         this.sslEngine = engine;
-        if (engine != null)
+        if (engine != null) {
             this.sslSession = engine.getSession();
+        }
         this.inputBuffer = inputBuffer;
     }
 
@@ -146,11 +147,13 @@ public class NIOSSLTransport extends NIOTransport {
             this.buffOut = outputStream;
 
             //If the sslEngine was not passed in, then handshake
-            if (!hasSslEngine)
+            if (!hasSslEngine) {
                 sslEngine.beginHandshake();
+            }
             handshakeStatus = sslEngine.getHandshakeStatus();
-            if (!hasSslEngine)
+            if (!hasSslEngine) {
                 doHandshake();
+            }
 
            // if (hasSslEngine) {
             selection = SelectorManager.getInstance().register(channel, new 
SelectorManager.Listener() {
@@ -328,27 +331,28 @@ public class NIOSSLTransport extends NIOTransport {
             currentBuffer.putInt(nextFrameSize);
 
         } else {
-
             // If its all in one read then we can just take it all, otherwise 
take only
             // the current frame size and the next iteration starts a new 
command.
-            if (currentBuffer.remaining() >= plain.remaining()) {
-                currentBuffer.put(plain);
-            } else {
-                byte[] fill = new byte[currentBuffer.remaining()];
-                plain.get(fill);
-                currentBuffer.put(fill);
-            }
+            if (currentBuffer != null) {
+                if (currentBuffer.remaining() >= plain.remaining()) {
+                    currentBuffer.put(plain);
+                } else {
+                    byte[] fill = new byte[currentBuffer.remaining()];
+                    plain.get(fill);
+                    currentBuffer.put(fill);
+                }
 
-            // Either we have enough data for a new command or we have to wait 
for some more.
-            if (currentBuffer.hasRemaining()) {
-                return;
-            } else {
-                currentBuffer.flip();
-                Object command = wireFormat.unmarshal(new DataInputStream(new 
NIOInputStream(currentBuffer)));
-                doConsume(command);
-                nextFrameSize = -1;
-                currentBuffer = null;
-           }
+                // Either we have enough data for a new command or we have to 
wait for some more.
+                if (currentBuffer.hasRemaining()) {
+                    return;
+                } else {
+                    currentBuffer.flip();
+                    Object command = wireFormat.unmarshal(new 
DataInputStream(new NIOInputStream(currentBuffer)));
+                    doConsume(command);
+                    nextFrameSize = -1;
+                    currentBuffer = null;
+               }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index c7fe00f..5896e74 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -481,47 +481,60 @@ public class TcpTransportServer extends 
TransportServerThreadSupport implements
 
     final protected void doHandleSocket(Socket socket) {
         boolean closeSocket = true;
+        boolean countIncremented = false;
         try {
-            if (this.currentTransportCount.get() >= this.maximumConnections) {
-                throw new ExceededMaximumConnectionsException(
-                    "Exceeded the maximum number of allowed client 
connections. See the '" +
-                    "maximumConnections' property on the TCP transport 
configuration URI " +
-                    "in the ActiveMQ configuration file (e.g., activemq.xml)");
-            } else {
-                currentTransportCount.incrementAndGet();
-
-                HashMap<String, Object> options = new HashMap<String, 
Object>();
-                options.put("maxInactivityDuration", 
Long.valueOf(maxInactivityDuration));
-                options.put("maxInactivityDurationInitalDelay", 
Long.valueOf(maxInactivityDurationInitalDelay));
-                options.put("minmumWireFormatVersion", 
Integer.valueOf(minmumWireFormatVersion));
-                options.put("trace", Boolean.valueOf(trace));
-                options.put("soTimeout", Integer.valueOf(soTimeout));
-                options.put("socketBufferSize", 
Integer.valueOf(socketBufferSize));
-                options.put("connectionTimeout", 
Integer.valueOf(connectionTimeout));
-                options.put("logWriterName", logWriterName);
-                options.put("dynamicManagement", 
Boolean.valueOf(dynamicManagement));
-                options.put("startLogging", Boolean.valueOf(startLogging));
-                options.putAll(transportOptions);
-
-                TransportInfo transportInfo = configureTransport(this, socket);
-                closeSocket = false;
-
-                if (transportInfo.transport instanceof ServiceSupport) {
-                    ((ServiceSupport) 
transportInfo.transport).addServiceListener(this);
-                }
+            int currentCount;
+            do {
+                currentCount = currentTransportCount.get();
+                if (currentCount >= this.maximumConnections) {
+                     throw new ExceededMaximumConnectionsException(
+                         "Exceeded the maximum number of allowed client 
connections. See the '" +
+                         "maximumConnections' property on the TCP transport 
configuration URI " +
+                         "in the ActiveMQ configuration file (e.g., 
activemq.xml)");
+                 }
+
+            //Increment this value before configuring the transport
+            //This is necessary because some of the transport servers must 
read from the
+            //socket during configureTransport() so we want to make sure this 
value is
+            //accurate as the transport server could pause here waiting for 
data to be sent from a client
+            } while(!currentTransportCount.compareAndSet(currentCount, 
currentCount + 1));
+            countIncremented = true;
+
+            HashMap<String, Object> options = new HashMap<String, Object>();
+            options.put("maxInactivityDuration", 
Long.valueOf(maxInactivityDuration));
+            options.put("maxInactivityDurationInitalDelay", 
Long.valueOf(maxInactivityDurationInitalDelay));
+            options.put("minmumWireFormatVersion", 
Integer.valueOf(minmumWireFormatVersion));
+            options.put("trace", Boolean.valueOf(trace));
+            options.put("soTimeout", Integer.valueOf(soTimeout));
+            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
+            options.put("connectionTimeout", 
Integer.valueOf(connectionTimeout));
+            options.put("logWriterName", logWriterName);
+            options.put("dynamicManagement", 
Boolean.valueOf(dynamicManagement));
+            options.put("startLogging", Boolean.valueOf(startLogging));
+            options.putAll(transportOptions);
+
+            TransportInfo transportInfo = configureTransport(this, socket);
+            closeSocket = false;
+
+            if (transportInfo.transport instanceof ServiceSupport) {
+                ((ServiceSupport) 
transportInfo.transport).addServiceListener(this);
+            }
 
-                Transport configuredTransport = 
transportInfo.transportFactory.serverConfigure(
-                        transportInfo.transport, transportInfo.format, 
options);
+            Transport configuredTransport = 
transportInfo.transportFactory.serverConfigure(
+                    transportInfo.transport, transportInfo.format, options);
+
+            getAcceptListener().onAccept(configuredTransport);
 
-                getAcceptListener().onAccept(configuredTransport);
-            }
         } catch (SocketTimeoutException ste) {
             // expect this to happen
-            currentTransportCount.decrementAndGet();
         } catch (Exception e) {
-            currentTransportCount.decrementAndGet();
             if (closeSocket) {
                 try {
+                    //if closing the socket, only decrement the count it was 
actually incremented
+                    //where it was incremented
+                    if (countIncremented) {
+                        currentTransportCount.decrementAndGet();
+                    }
                     socket.close();
                 } catch (Exception ignore) {
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java
 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java
new file mode 100644
index 0000000..e7b372b
--- /dev/null
+++ 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp.auto;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.transport.stomp.StompTestSupport;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that connection attempts that don't send the connect get cleaned by
+ * by the protocolDetectionTimeOut property
+ */
+@RunWith(Parameterized.class)
+public class AutoStompConnectTimeoutTest extends StompTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoStompConnectTimeoutTest.class);
+
+    private Socket connection;
+    protected String connectorScheme;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"auto"},
+                {"auto+ssl"},
+                {"auto+nio"},
+                {"auto+nio+ssl"}
+            });
+    }
+
+    public AutoStompConnectTimeoutTest(String connectorScheme) {
+        this.connectorScheme = connectorScheme;
+    }
+
+    protected String getConnectorScheme() {
+        return connectorScheme;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Throwable e) {}
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    @Override
+    public String getAdditionalConfig() {
+        return "?protocolDetectionTimeOut=1500";
+    }
+
+    @Test(timeout = 15000)
+    public void testInactivityMonitor() throws Exception {
+
+        Thread t1 = new Thread() {
+
+            @Override
+            public void run() {
+                try {
+                    connection = createSocket();
+                    connection.getOutputStream().write('C');
+                    connection.getOutputStream().flush();
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", 
ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        t1.start();
+
+        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 AutoTcpTransportServer server = (AutoTcpTransportServer) 
brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer();
+                 return 1 == server.getCurrentTransportCount().get();
+             }
+        }, TimeUnit.SECONDS.toMillis(15), 
TimeUnit.MILLISECONDS.toMillis(250)));
+
+        // and it should be closed due to inactivity
+        assertTrue("no dangling connections", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                AutoTcpTransportServer server = (AutoTcpTransportServer) 
brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer();
+                return 0 == server.getCurrentTransportCount().get();
+            }
+        }, TimeUnit.SECONDS.toMillis(15), 
TimeUnit.MILLISECONDS.toMillis(500)));
+
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return false;
+    }
+    @Override
+    protected boolean isUseAutoConnector() {
+        return connectorScheme.equalsIgnoreCase("auto");
+    }
+
+    @Override
+    protected boolean isUseAutoSslConnector() {
+        return connectorScheme.equalsIgnoreCase("auto+ssl");
+    }
+
+    @Override
+    protected boolean isUseAutoNioConnector() {
+        return connectorScheme.equalsIgnoreCase("auto+nio");
+    }
+
+    @Override
+    protected boolean isUseAutoNioPlusSslConnector() {
+        return connectorScheme.equalsIgnoreCase("auto+nio+ssl");
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+
+        boolean useSSL = false;
+        int port = 0;
+
+        switch (connectorScheme) {
+            case "auto":
+                port = this.autoPort;
+                break;
+            case "auto+ssl":
+                useSSL = true;
+                port = this.autoSslPort;
+                break;
+            case "auto+nio":
+                port = this.autoNioPort;
+                break;
+            case "auto+nio+ssl":
+                useSSL = true;
+                port = this.autoNioSslPort;
+                break;
+            default:
+                throw new IOException("Invalid STOMP connector scheme passed 
to test.");
+        }
+
+        if (useSSL) {
+            return SSLSocketFactory.getDefault().createSocket("localhost", 
port);
+        } else {
+            return new Socket("localhost", port);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java
index 496ccf7..cb90b41 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java
@@ -49,7 +49,6 @@ public class AutoNIOSslTransportBrokerTest extends 
TransportBrokerTestSupport {
         System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
         System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
         System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
-        //System.setProperty("javax.net.debug", 
"ssl,handshake,data,trustmanager");
 
         maxWait = 10000;
         super.setUp();

http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
new file mode 100644
index 0000000..bbc20a3
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.springframework.jms.support.JmsUtils;
+
+@RunWith(Parameterized.class)
+public class AutoTransportMaxConnectionsTest {
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = 
"src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = 
"src/test/resources/client.keystore";
+    private static final int maxConnections = 20;
+
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+    private String connectionUri;
+    private BrokerService service;
+    private TransportConnector connector;
+    private final String transportType;
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"auto"},
+                {"auto+nio"},
+                {"auto+ssl"},
+                {"auto+nio+ssl"},
+            });
+    }
+
+
+    public AutoTransportMaxConnectionsTest(String transportType) {
+        super();
+        this.transportType = transportType;
+    }
+
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+
+        service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+        connector = service.addConnector(transportType + 
"://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections);
+        connectionUri = connector.getPublishableConnectString();
+        service.start();
+        service.waitUntilStarted();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @Test
+    public void testMaxConnectionControl() throws Exception {
+        final ConnectionFactory cf = createConnectionFactory();
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+
+        for(int i = 0; i < maxConnections + 20; i++) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    Connection conn = null;
+                    try {
+                        startupLatch.await();
+                        conn = cf.createConnection();
+                        conn.start();
+                    } catch (Exception e) {
+                        //JmsUtils.closeConnection(conn);
+                    }
+                }
+            });
+        }
+
+        TcpTransportServer transportServer = 
(TcpTransportServer)connector.getServer();
+        // ensure the max connections is in effect
+        assertEquals(maxConnections, transportServer.getMaximumConnections());
+        // No connections at first
+        assertEquals(0, connector.getConnections().size());
+        // Release the latch to set up connections in parallel
+        startupLatch.countDown();
+
+        final TransportConnector connector = this.connector;
+
+        // Expect the max connections is created
+        assertTrue("Expected: " + maxConnections + " found: " + 
connector.getConnections().size(),
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return connector.getConnections().size() == maxConnections;
+                }
+            })
+        );
+
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+
+        service.stop();
+        service.waitUntilStopped();
+    }
+}

Reply via email to