Fix for https://issues.apache.org/jira/browse/AMQ-4719
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a482a68d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a482a68d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a482a68d Branch: refs/heads/activemq-5.9 Commit: a482a68d1fa2bcb023d171d078941c473080468a Parents: 11ad948 Author: Rob Davies <[email protected]> Authored: Tue Nov 12 08:08:42 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 10:25:08 2014 -0400 ---------------------------------------------------------------------- .../transport/mqtt/MQTTNIOSSLTransportFactory.java | 4 +++- .../transport/mqtt/MQTTNIOTransportFactory.java | 4 +++- .../activemq/transport/mqtt/MQTTTransportFactory.java | 13 +++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a482a68d/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java index b9dfaba..f13b537 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java @@ -38,7 +38,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory { @Override protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new TcpTransportServer(this, location, serverSocketFactory) { + TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) { protected Transport createTransport(Socket socket, WireFormat format) throws IOException { MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket); if (context != null) { @@ -47,6 +47,8 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory { return transport; } }; + result.setAllowLinkStealing(true); + return result; } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/a482a68d/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java index f18e900..52fa228 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java @@ -49,11 +49,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok } protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new TcpTransportServer(this, location, serverSocketFactory) { + TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) { protected Transport createTransport(Socket socket, WireFormat format) throws IOException { return new MQTTNIOTransport(format, socket); } }; + result.setAllowLinkStealing(true); + return result; } protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { http://git-wip-us.apache.org/repos/asf/activemq/blob/a482a68d/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java index de50cf2..7b4696a 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.transport.mqtt; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; @@ -25,9 +29,12 @@ import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; +import javax.net.ServerSocketFactory; + /** * A <a href="http://mqtt.org/">MQTT</a> transport factory */ @@ -39,6 +46,12 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS return "mqtt"; } + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory); + result.setAllowLinkStealing(true); + return result; + } + @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new MQTTTransportFilter(transport, format, brokerContext);
