Repository: activemq
Updated Branches:
  refs/heads/trunk 6972d37e6 -> 4ba4aa21d


https://issues.apache.org/jira/browse/AMQ-5112 - mqtt transport thread safety


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4ba4aa21
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4ba4aa21
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4ba4aa21

Branch: refs/heads/trunk
Commit: 4ba4aa21d36f86554812d85a2b0143a6b8790d6d
Parents: 6972d37
Author: Dejan Bosanac <[email protected]>
Authored: Thu Mar 20 11:31:44 2014 +0100
Committer: Dejan Bosanac <[email protected]>
Committed: Thu Mar 20 11:31:44 2014 +0100

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  4 +--
 .../transport/mqtt/MQTTTransportFilter.java     | 16 +++++----
 .../activemq/transport/mqtt/MQTTNioTest.java    | 15 ---------
 .../activemq/transport/mqtt/MQTTSSLTest.java    | 18 +----------
 .../activemq/transport/mqtt/MQTTTest.java       | 34 ++++++--------------
 5 files changed, 22 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4ba4aa21/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 614b133..014c6f6 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -121,7 +121,7 @@ public class MQTTProtocolConverter {
             command.setResponseRequired(true);
             resposeHandlers.put(command.getCommandId(), handler);
         }
-        mqttTransport.sendToActiveMQ(command);
+        getMQTTTransport().sendToActiveMQ(command);
     }
 
     void sendToMQTT(MQTTFrame frame) {
@@ -140,7 +140,7 @@ public class MQTTProtocolConverter {
         switch (frame.messageType()) {
             case PINGREQ.TYPE: {
                 LOG.debug("Received a ping from client: " + getClientId());
-                mqttTransport.sendToMQTT(PING_RESP_FRAME);
+                sendToMQTT(PING_RESP_FRAME);
                 LOG.debug("Sent Ping Response to " + getClientId());
                 break;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4ba4aa21/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 54f40e7..181d235 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -17,13 +17,10 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
-import java.net.ProtocolException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.JMSException;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
@@ -51,6 +48,7 @@ public class MQTTTransportFilter extends TransportFilter 
implements MQTTTranspor
     private final AtomicBoolean stopped = new AtomicBoolean();
 
     private boolean trace;
+    private final Object sendLock = new Object();
 
     public MQTTTransportFilter(Transport next, WireFormat wireFormat, 
BrokerService brokerService) {
         super(next);
@@ -80,7 +78,7 @@ public class MQTTTransportFilter extends TransportFilter 
implements MQTTTranspor
             }
             protocolConverter.onMQTTCommand(frame);
         } catch (IOException e) {
-            handleException(e);
+            onException(e);
         } catch (JMSException e) {
             onException(IOExceptionSupport.create(e));
         }
@@ -102,7 +100,10 @@ public class MQTTTransportFilter extends TransportFilter 
implements MQTTTranspor
             }
             Transport n = next;
             if (n != null) {
-                n.oneway(command);
+                // sync access to underlying transport buffer
+                synchronized (sendLock) {
+                    n.oneway(command);
+                }
             }
         }
     }
@@ -174,9 +175,10 @@ public class MQTTTransportFilter extends TransportFilter 
implements MQTTTranspor
         return this.wireFormat;
     }
 
-    public void handleException(IOException e) {
+    @Override
+    public void onException(IOException error) {
         protocolConverter.onTransportError();
-        super.onException(e);
+        super.onException(error);
     }
 
     public long getDefaultKeepAlive() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4ba4aa21/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
index 2433d2e..2513ac3 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
@@ -30,7 +30,6 @@ import 
org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.BlockJUnit4ClassRunner;
@@ -43,20 +42,6 @@ public class MQTTNioTest extends MQTTTest {
         return "mqtt+nio";
     }
 
-    @Ignore("See AMQ-4712")
-    @Override
-    @Test
-    public void testReceiveMessageSentWhileOffline() throws Exception {
-        super.testReceiveMessageSentWhileOffline();
-    }
-
-    @Ignore("See AMQ-4712")
-    @Override
-    @Test
-    public void testResendMessageId() throws Exception {
-        super.testResendMessageId();
-    }
-
     @Test
     public void testPingOnMQTTNIO() throws Exception {
         addMQTTConnector("maxInactivityDuration=-1");

http://git-wip-us.apache.org/repos/asf/activemq/blob/4ba4aa21/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
index 6b44ae2..1eb4ff5 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
@@ -19,14 +19,12 @@ package org.apache.activemq.transport.mqtt;
 import java.security.SecureRandom;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
-
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
+
 import org.fusesource.mqtt.client.MQTT;
-import org.junit.Ignore;
-import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.BlockJUnit4ClassRunner;
 import org.slf4j.Logger;
@@ -53,20 +51,6 @@ public class MQTTSSLTest extends MQTTTest {
         return "mqtt+ssl";
     }
 
-    @Ignore("See AMQ-4712")
-    @Override
-    @Test
-    public void testReceiveMessageSentWhileOffline() throws Exception {
-        super.testReceiveMessageSentWhileOffline();
-    }
-
-    @Ignore("See AMQ-4712")
-    @Override
-    @Test
-    public void testResendMessageId() throws Exception {
-        super.testResendMessageId();
-    }
-
     protected MQTT createMQTTConnection() throws Exception {
         MQTT mqtt = new MQTT();
         mqtt.setConnectAttemptsMax(1);

http://git-wip-us.apache.org/repos/asf/activemq/blob/4ba4aa21/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index d5d3983..3acb4bb 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -46,7 +46,6 @@ import 
org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.activemq.security.SimpleAuthorizationMap;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
-import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
@@ -590,15 +589,7 @@ public class MQTTTest extends AbstractMQTTTest {
                 if (frame.messageType() == PUBLISH.TYPE) {
                     PUBLISH publish = new PUBLISH();
                     try {
-                        // copy the buffers before we decode
-                        Buffer[] buffers = frame.buffers();
-                        Buffer[] copy = new Buffer[buffers.length];
-                        for (int i = 0; i < buffers.length; i++) {
-                            copy[i] = buffers[i].deepCopy();
-                        }
                         publish.decode(frame);
-                        // reset frame buffers to deep copy
-                        frame.buffers(copy);
                     } catch (ProtocolException e) {
                         fail("Error decoding publish " + e.getMessage());
                     }
@@ -684,15 +675,7 @@ public class MQTTTest extends AbstractMQTTTest {
                 if (frame.messageType() == PUBLISH.TYPE) {
                     PUBLISH publish = new PUBLISH();
                     try {
-                        // copy the buffers before we decode
-                        Buffer[] buffers = frame.buffers();
-                        Buffer[] copy = new Buffer[buffers.length];
-                        for (int i = 0; i < buffers.length; i++) {
-                            copy[i] = buffers[i].deepCopy();
-                        }
                         publish.decode(frame);
-                        // reset frame buffers to deep copy
-                        frame.buffers(copy);
                     } catch (ProtocolException e) {
                         fail("Error decoding publish " + e.getMessage());
                     }
@@ -717,25 +700,28 @@ public class MQTTTest extends AbstractMQTTTest {
         // publish non-retained message
         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
 
-        Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
         assertNotNull(msg);
         assertEquals(TOPIC, new String(msg.getPayload()));
-        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        msg = connection.receive(1000, TimeUnit.MILLISECONDS);
         assertNotNull(msg);
         assertEquals(TOPIC, new String(msg.getPayload()));
 
         // drop subs without acknowledging messages, then subscribe and 
receive again
         connection.unsubscribe(subs);
+        Thread.sleep(1000);
         connection.subscribe(new Topic[]{new Topic(subs[0], 
QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)});
+        Thread.sleep(1000);
 
         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
         assertNotNull(msg);
         assertEquals(TOPIC, new String(msg.getPayload()));
+        final Message msg2 = connection.receive(5000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg2);
+        assertEquals(TOPIC, new String(msg2.getPayload()));
+        // ack messages after receiving all of them
         msg.ack();
-        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
-        assertNotNull(msg);
-        assertEquals(TOPIC, new String(msg.getPayload()));
-        msg.ack();
+        msg2.ack();
 
         // make sure we received duplicate message ids
         List<Integer> dups = new ArrayList<Integer>();
@@ -1177,4 +1163,4 @@ public class MQTTTest extends AbstractMQTTTest {
         connection.disconnect();
     }
 
-}
\ No newline at end of file
+}

Reply via email to