Author: rajdavies
Date: Tue May 22 15:22:55 2012
New Revision: 1341521
URL: http://svn.apache.org/viewvc?rev=1341521&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3855
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1341521&r1=1341520&r2=1341521&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Tue May 22 15:22:55 2012
@@ -261,8 +261,6 @@ class MQTTProtocolConverter {
QoS onSubscribe(SUBSCRIBE command, Topic topic) throws
MQTTProtocolException {
ActiveMQDestination destination = new
ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
-
-
if (destination == null) {
throw new MQTTProtocolException("Invalid Destination.");
}
@@ -458,31 +456,15 @@ class MQTTProtocolConverter {
}
result.topicName(topicName);
- ByteSequence byteSequence = message.getContent();
- if (message.isCompressed()) {
- Inflater inflater = new Inflater();
- inflater.setInput(byteSequence.data, byteSequence.offset,
byteSequence.length);
- byte[] data = new byte[4096];
- int read;
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- while ((read = inflater.inflate(data, 0, data.length)) != 0) {
- bytesOut.write(data, 0, read);
- }
- byteSequence = bytesOut.toByteSequence();
- }
if (message.getDataStructureType() ==
ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
- if (byteSequence.getLength() > 4) {
- byte[] content = new byte[byteSequence.getLength() - 4];
- System.arraycopy(byteSequence.data, 4, content, 0,
content.length);
- result.payload(new Buffer(content));
- } else {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+ msg.setReadOnlyBody(true);
String messageText = msg.getText();
if (messageText != null) {
- result.payload(new
Buffer(msg.getText().getBytes("UTF-8")));
+ result.payload(new Buffer(messageText.getBytes("UTF-8")));
}
- }
+
} else if (message.getDataStructureType() ==
ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
@@ -491,8 +473,29 @@ class MQTTProtocolConverter {
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
result.payload(new Buffer(data));
- } else {
+ } else if (message.getDataStructureType() ==
ActiveMQMapMessage.DATA_STRUCTURE_TYPE){
+ ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ Map map = msg.getContentMap();
+ if (map != null){
+ result.payload(new Buffer(map.toString().getBytes("UTF-8")));
+ }
+ }
+
+ else {
+ ByteSequence byteSequence = message.getContent();
if (byteSequence != null && byteSequence.getLength() > 0) {
+ if (message.isCompressed()){
+ Inflater inflater = new Inflater();
+
inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length);
+ byte[] data = new byte[4096];
+ int read;
+ ByteArrayOutputStream bytesOut = new
ByteArrayOutputStream();
+ while((read = inflater.inflate(data)) != 0){
+ bytesOut.write(data,0,read);
+ }
+ byteSequence = bytesOut.toByteSequence();
+ }
result.payload(new Buffer(byteSequence.data,
byteSequence.offset, byteSequence.length));
}
}
@@ -620,9 +623,9 @@ class MQTTProtocolConverter {
}
private String convertMQTTToActiveMQ(String name) {
- String result = name.replace('>', '#');
- result = result.replace('*', '+');
- result = result.replace('.', '/');
+ String result = name.replace('#', '>');
+ result = result.replace('+', '*');
+ result = result.replace('/', '.');
return result;
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1341521&r1=1341520&r2=1341521&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Tue May 22 15:22:55 2012
@@ -17,9 +17,13 @@
package org.apache.activemq.transport.mqtt;
import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
@@ -63,6 +67,47 @@ public class MQTTTest {
}
@Test
+ public void testSendAndReceiveMQTT() throws Exception {
+ addMQTTConnector(brokerService);
+ brokerService.start();
+ MQTT mqtt = new MQTT();
+ final BlockingConnection subscribeConnection =
mqtt.blockingConnection();
+ subscribeConnection.connect();
+ Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
+ Topic[] topics = {topic};
+ subscribeConnection.subscribe(topics);
+ final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ for (int i = 0; i < numberOfMessages; i++){
+ try {
+ Message message = subscribeConnection.receive();
+ message.ack();
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
+
+ }
+ }
+ });
+ thread.start();
+
+ BlockingConnection publisherConnection = mqtt.blockingConnection();
+ publisherConnection.connect();
+ for (int i = 0; i < numberOfMessages; i++){
+ String payload = "Message " + i;
+
publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
+ }
+
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+
+ }
+
+ @Test
public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector(brokerService);
brokerService.start();
@@ -172,7 +217,7 @@ public class MQTTTest {
brokerService.start();
MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection();
- final String DESTINATION_NAME = "foo";
+ final String DESTINATION_NAME = "foo.*";
connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new
ActiveMQConnectionFactory().createConnection();
@@ -183,7 +228,7 @@ public class MQTTTest {
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
- connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE,
false);
+ connection.publish("foo/bah", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
ByteSequence bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
@@ -194,6 +239,36 @@ public class MQTTTest {
connection.disconnect();
}
+ @Test
+ public void testSendJMSReceiveMQTT() throws Exception {
+ addMQTTConnector(brokerService);
+
brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
+ brokerService.start();
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive(Short.MAX_VALUE);
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ ActiveMQConnection activeMQConnection = (ActiveMQConnection) new
ActiveMQConnectionFactory().createConnection();
+ activeMQConnection.start();
+ Session s = activeMQConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Topic jmsTopic = s.createTopic("foo.far");
+ MessageProducer producer = s.createProducer(jmsTopic);
+
+ Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)};
+ connection.subscribe(topics);
+ for (int i = 0; i < numberOfMessages; i++) {
+ String payload = "This is Test Message: " + i;
+ TextMessage sendMessage = s.createTextMessage(payload);
+ producer.send(sendMessage);
+ Message message = connection.receive();
+ message.ack();
+ assertEquals(payload, new String(message.getPayload()));
+ }
+ connection.disconnect();
+ }
+
+
protected void addMQTTConnector(BrokerService brokerService) throws
Exception {
brokerService.addConnector("mqtt://localhost:1883");
}