Fix for https://issues.apache.org/jira/browse/AMQ-4719


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a482a68d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a482a68d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a482a68d

Branch: refs/heads/activemq-5.9
Commit: a482a68d1fa2bcb023d171d078941c473080468a
Parents: 11ad948
Author: Rob Davies <[email protected]>
Authored: Tue Nov 12 08:08:42 2013 +0000
Committer: Hadrian Zbarcea <[email protected]>
Committed: Wed Mar 12 10:25:08 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTNIOSSLTransportFactory.java     |  4 +++-
 .../transport/mqtt/MQTTNIOTransportFactory.java        |  4 +++-
 .../activemq/transport/mqtt/MQTTTransportFactory.java  | 13 +++++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a482a68d/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
index b9dfaba..f13b537 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
@@ -38,7 +38,7 @@ public class MQTTNIOSSLTransportFactory extends 
MQTTNIOTransportFactory {
 
     @Override
     protected TcpTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
-        return new TcpTransportServer(this, location, serverSocketFactory) {
+        TcpTransportServer result = new TcpTransportServer(this, location, 
serverSocketFactory) {
             protected Transport createTransport(Socket socket, WireFormat 
format) throws IOException {
                 MQTTNIOSSLTransport transport = new 
MQTTNIOSSLTransport(format, socket);
                 if (context != null) {
@@ -47,6 +47,8 @@ public class MQTTNIOSSLTransportFactory extends 
MQTTNIOTransportFactory {
                 return transport;
             }
         };
+        result.setAllowLinkStealing(true);
+        return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/a482a68d/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index f18e900..52fa228 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -49,11 +49,13 @@ public class MQTTNIOTransportFactory extends 
NIOTransportFactory implements Brok
     }
 
     protected TcpTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
-        return new TcpTransportServer(this, location, serverSocketFactory) {
+        TcpTransportServer result = new TcpTransportServer(this, location, 
serverSocketFactory) {
             protected Transport createTransport(Socket socket, WireFormat 
format) throws IOException {
                 return new MQTTNIOTransport(format, socket);
             }
         };
+        result.setAllowLinkStealing(true);
+        return result;
     }
 
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory 
socketFactory, URI location, URI localLocation) throws UnknownHostException, 
IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a482a68d/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
index de50cf2..7b4696a 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,9 +29,12 @@ import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
+import javax.net.ServerSocketFactory;
+
 /**
  * A <a href="http://mqtt.org/";>MQTT</a> transport factory
  */
@@ -39,6 +46,12 @@ public class MQTTTransportFactory extends 
TcpTransportFactory implements BrokerS
         return "mqtt";
     }
 
+    protected TcpTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
+        TcpTransportServer result =  new TcpTransportServer(this, location, 
serverSocketFactory);
+        result.setAllowLinkStealing(true);
+        return result;
+    }
+
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat 
format, Map options) {
         transport = new MQTTTransportFilter(transport, format, brokerContext);

Reply via email to