Copied:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
(from r1303689,
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
Sun Mar 25 06:33:49 2012
@@ -14,30 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
-
-import java.io.*;
-import java.util.HashMap;
-import java.util.Map;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.mqtt.codec.MQTTFrame;
/**
* Implements marshalling and unmarsalling the <a
- * href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ * href="http://mqtt.org/">MQTT</a> protocol.
*/
-public class StompWireFormat implements WireFormat {
+public class MQTTWireFormat implements WireFormat {
- private static final byte[] NO_DATA = new byte[] {};
- private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
- private static final int MAX_COMMAND_LENGTH = 1024;
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
- private static final int MAX_HEADERS = 1000;
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
private boolean encodingEnabled = false;
private int version = 1;
@@ -56,254 +55,70 @@ public class StompWireFormat implements
return unmarshal(dis);
}
- public void marshal(Object command, DataOutput os) throws IOException {
- StompFrame stomp =
(org.apache.activemq.transport.stomp.StompFrame)command;
-
- if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) {
- os.write(Stomp.BREAK);
- return;
- }
-
- StringBuilder buffer = new StringBuilder();
- buffer.append(stomp.getAction());
- buffer.append(Stomp.NEWLINE);
-
- // Output the headers.
- for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) {
- buffer.append(entry.getKey());
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(encodeHeader(entry.getValue()));
- buffer.append(Stomp.NEWLINE);
+ public void marshal(Object command, DataOutput dataOut) throws IOException
{
+ MQTTFrame frame = (MQTTFrame) command;
+ dataOut.write(frame.header());
+
+ int remaining = 0;
+ for (Buffer buffer : frame.buffers) {
+ remaining += buffer.length;
+ }
+ do {
+ byte digit = (byte) (remaining & 0x7F);
+ remaining >>>= 7;
+ if (remaining > 0) {
+ digit |= 0x80;
+ }
+ dataOut.write(digit);
+ } while (remaining > 0);
+ for (Buffer buffer : frame.buffers) {
+ dataOut.write(buffer.data, buffer.offset, buffer.length);
}
-
- // Add a newline to seperate the headers from the content.
- buffer.append(Stomp.NEWLINE);
-
- os.write(buffer.toString().getBytes("UTF-8"));
- os.write(stomp.getContent());
- os.write(END_OF_FRAME);
}
- public Object unmarshal(DataInput in) throws IOException {
-
- try {
-
- // parse action
- String action = parseAction(in);
-
- // Parse the headers
- HashMap<String, String> headers = parseHeaders(in);
-
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH);
- if ((action.equals(Stomp.Commands.SEND) ||
action.equals(Stomp.Responses.MESSAGE)) && contentLength != null) {
-
- // Bless the client, he's telling us how much data to read in.
- int length = parseContentLength(contentLength);
-
- data = new byte[length];
- in.readFully(data);
-
- if (in.readByte() != 0) {
- throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH +
" bytes were read and " + "there was no trailing null byte", true);
- }
-
+ public Object unmarshal(DataInput dataIn) throws IOException {
+ byte header = dataIn.readByte();
+
+ byte digit = 0;
+
+ int multiplier = 1;
+ int length = 0;
+ do {
+ digit = dataIn.readByte();
+ length += (digit & 0x7F) * multiplier;
+ multiplier <<= 7;
+ }
+ while ((digit & 0x80) != 0);
+ if (length >= 0) {
+ if (length > MAX_MESSAGE_LENGTH) {
+ throw new IOException("The maximum message length was
exceeded");
+ }
+
+ if (length > 0) {
+ byte[] data = new byte[length];
+ dataIn.readFully(data);
+ Buffer body = new Buffer(data);
+ return new MQTTFrame(body).header(header);
} else {
-
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos = null;
- while ((b = in.readByte()) != 0) {
-
- if (baos == null) {
- baos = new ByteArrayOutputStream();
- } else if (baos.size() > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length
was exceeded", true);
- }
-
- baos.write(b);
- }
-
- if (baos != null) {
- baos.close();
- data = baos.toByteArray();
- }
+ return new MQTTFrame().header(header);
}
-
- return new StompFrame(action, headers, data);
-
- } catch (ProtocolException e) {
- return new StompFrameError(e);
}
+ return null;
}
- private String readLine(DataInput in, int maxLength, String errorMessage)
throws IOException {
- ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage);
- return new String(sequence.getData(), sequence.getOffset(),
sequence.getLength(), "UTF-8").trim();
- }
-
- private ByteSequence readHeaderLine(DataInput in, int maxLength, String
errorMessage) throws IOException {
- byte b;
- ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
- while ((b = in.readByte()) != '\n') {
- if (baos.size() > maxLength) {
- throw new ProtocolException(errorMessage, true);
- }
- baos.write(b);
- }
- baos.close();
- return baos.toByteSequence();
- }
-
- protected String parseAction(DataInput in) throws IOException {
- String action = null;
-
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command
length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- } else {
- action = action.trim();
- if (action.length() > 0) {
- break;
- }
- }
- }
- return action;
- }
-
- protected HashMap<String, String> parseHeaders(DataInput in) throws
IOException {
- HashMap<String, String> headers = new HashMap<String, String>(25);
- while (true) {
- ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The
maximum header length was exceeded");
- if (line != null && line.length > 1) {
-
- if (headers.size() > MAX_HEADERS) {
- throw new ProtocolException("The maximum number of headers
was exceeded", true);
- }
-
- try {
-
- ByteArrayInputStream headerLine = new
ByteArrayInputStream(line);
- ByteArrayOutputStream stream = new
ByteArrayOutputStream(line.length);
-
- // First complete the name
- int result = -1;
- while ((result = headerLine.read()) != -1) {
- if (result != ':') {
- stream.write(result);
- } else {
- break;
- }
- }
-
- ByteSequence nameSeq = stream.toByteSequence();
- String name = new String(nameSeq.getData(),
nameSeq.getOffset(), nameSeq.getLength(), "UTF-8").trim();
- String value = decodeHeader(headerLine).trim();
- headers.put(name, value);
- } catch (Exception e) {
- throw new ProtocolException("Unable to parser header line
[" + line + "]", true);
- }
- } else {
- break;
- }
- }
- return headers;
- }
-
- protected int parseContentLength(String contentLength) throws
ProtocolException {
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- } catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a
valid integer", true);
- }
-
- if (length > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length was
exceeded", true);
- }
-
- return length;
- }
-
- private String encodeHeader(String header) throws IOException {
- String result = header;
- if (this.encodingEnabled) {
- byte[] utf8buf = header.getBytes("UTF-8");
- ByteArrayOutputStream stream = new
ByteArrayOutputStream(utf8buf.length);
- for(byte val : utf8buf) {
- switch(val) {
- case Stomp.ESCAPE:
- stream.write(Stomp.ESCAPE_ESCAPE_SEQ);
- break;
- case Stomp.BREAK:
- stream.write(Stomp.NEWLINE_ESCAPE_SEQ);
- break;
- case Stomp.COLON:
- stream.write(Stomp.COLON_ESCAPE_SEQ);
- break;
- default:
- stream.write(val);
- }
- }
- result = new String(stream.toByteArray(), "UTF-8");
- }
-
- return result;
- }
-
- private String decodeHeader(InputStream header) throws IOException {
- ByteArrayOutputStream decoded = new ByteArrayOutputStream();
- PushbackInputStream stream = new PushbackInputStream(header);
-
- int value = -1;
- while( (value = stream.read()) != -1) {
- if (value == 92) {
-
- int next = stream.read();
- if (next != -1) {
- switch(next) {
- case 110:
- decoded.write(Stomp.BREAK);
- break;
- case 99:
- decoded.write(Stomp.COLON);
- break;
- case 92:
- decoded.write(Stomp.ESCAPE);
- break;
- default:
- stream.unread(next);
- decoded.write(value);
- }
- } else {
- decoded.write(value);
- }
-
- } else {
- decoded.write(value);
- }
- }
-
- return new String(decoded.toByteArray(), "UTF-8");
- }
-
- public int getVersion() {
- return version;
- }
-
+ /**
+ * @param the version of the wire format
+ */
public void setVersion(int version) {
this.version = version;
}
- public boolean isEncodingEnabled() {
- return this.encodingEnabled;
+ /**
+ * @return the version of the wire format
+ */
+ public int getVersion() {
+ return this.version;
}
- public void setEncodingEnabled(boolean value) {
- this.encodingEnabled = value;
- }
}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java
(from r1303689,
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java
Sun Mar 25 06:33:49 2012
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
@@ -22,8 +22,8 @@ import org.apache.activemq.wireformat.Wi
/**
* Creates WireFormat objects that marshalls the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
-public class StompWireFormatFactory implements WireFormatFactory {
+public class MQTTWireFormatFactory implements WireFormatFactory {
public WireFormat createWireFormat() {
- return new StompWireFormat();
+ return new MQTTWireFormat();
}
}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java
(from r1303689,
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java
Sun Mar 25 06:33:49 2012
@@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import org.apache.activemq.command.Response;
+
/**
- * Interface used by the ProtocolConverter for callbacks.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
+ * Interface used by the MQTTProtocolConverter for callbacks.
*/
interface ResponseHandler {
- void onResponse(ProtocolConverter converter, Response response) throws
IOException;
+ void onResponse(MQTTProtocolConverter converter, Response response) throws
IOException;
}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java?rev=1304984&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
Sun Mar 25 06:33:49 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+public class WildCardConvertor {
+
+ static String convertActiveMQToMQTT(String name) {
+ String result = name.replaceAll("#", ">");
+ result = result.replaceAll("+", "*");
+ result = result.replaceAll("/", ".");
+ return result;
+ }
+
+ static String convertMQTTToActiveMQ(String name) {
+ String result = name.replaceAll(">", "#");
+ result = result.replaceAll("*", "+");
+ result = result.replaceAll(".", "/");
+ return result;
+ }
+}
Copied:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
(from r1303689,
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
Sun Mar 25 06:33:49 2012
@@ -19,8 +19,7 @@
</head>
<body>
-An implementation of the Stomp protocol which is a simple wire protocol for
writing clients for ActiveMQ in different
-languages like Ruby, Python, PHP, C etc.
+An implementation of the MQTT 3.1 protocol - see http://mqtt.org/
</body>
</html>
Copied:
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt
(from r1303689,
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
(original)
+++
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt
Sun Mar 25 06:33:49 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompTransportFactory
+class=org.apache.activemq.transport.mqtt.MQTTTransportFactory
Copied:
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt
(from r1303689,
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
(original)
+++
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt
Sun Mar 25 06:33:49 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.transport.mqtt.MQTTWireFormatFactory
\ No newline at end of file
Copied:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
(from r1303689,
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
Sun Mar 25 06:33:49 2012
@@ -14,29 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.util.Vector;
-import javax.net.ServerSocketFactory;
-import org.apache.activemq.broker.BrokerPlugin;
+
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.security.JaasDualAuthenticationPlugin;
-import org.apache.activemq.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import static junit.framework.Assert.assertTrue;
-
// https://issues.apache.org/jira/browse/AMQ-3393
-public class ConnectTest {
- private static final Logger LOG =
LoggerFactory.getLogger(ConnectTest.class);
+public class MQTTConnectTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(MQTTConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
@@ -56,123 +49,18 @@ public class ConnectTest {
}
@Test
- public void testStompConnectLeak() throws Exception {
+ public void testConnect() throws Exception {
- brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0");
+ brokerService.addConnector("mqtt://localhost:1883");
brokerService.start();
-
- Thread t1 = new Thread() {
- StompConnection connection = new StompConnection();
-
- public void run() {
- try {
- connection.open("localhost",
brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
- connection.connect("system", "manager");
- connection.disconnect();
- } catch (Exception ex) {
- LOG.error("unexpected exception on connect/disconnect",
ex);
- exceptions.add(ex);
- }
- }
- };
-
- int i = 0;
- long done = System.currentTimeMillis() + (15 * 1000);
- while (System.currentTimeMillis() < done) {
- t1.run();
- if (++i % 5000 == 0) {
- LOG.info("connection count on stomp connector:" +
brokerService.getTransportConnectors().get(0).connectionCount());
- }
- }
-
- assertTrue("no dangling connections", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 0 ==
brokerService.getTransportConnectors().get(0).connectionCount();
- }
- }));
- assertTrue("no exceptions", exceptions.isEmpty());
+ MQTT mqtt = new MQTT();
+ mqtt.setHost("localhost",1883);
+ BlockingConnection connection = mqtt.blockingConnection();
+
+ connection.connect();
+ Thread.sleep(1000);
+ connection.disconnect();
}
- @Test
- public void testJaasDualStopWithOpenConnection() throws Exception {
-
- brokerService.setPlugins(new BrokerPlugin[]{new
JaasDualAuthenticationPlugin()});
-
brokerService.addConnector("stomp://0.0.0.0:0?transport.closeAsync=false");
- brokerService.start();
-
- final int listenPort =
brokerService.getTransportConnectors().get(0).getConnectUri().getPort();
- Thread t1 = new Thread() {
- StompConnection connection = new StompConnection();
-
- public void run() {
- try {
- connection.open("localhost", listenPort);
- connection.connect("system", "manager");
- } catch (Exception ex) {
- LOG.error("unexpected exception on connect/disconnect",
ex);
- exceptions.add(ex);
- }
- }
- };
-
- t1.run();
-
- assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 1 ==
brokerService.getTransportConnectors().get(0).connectionCount();
- }
- }));
-
- brokerService.stop();
-
- // server socket should be available after stop
- ServerSocket socket =
ServerSocketFactory.getDefault().createServerSocket();
- socket.setReuseAddress(true);
- InetAddress address = InetAddress.getLocalHost();
- socket.bind(new InetSocketAddress(address, listenPort));
- LOG.info("bound address: " + socket);
- socket.close();
- assertTrue("no exceptions", exceptions.isEmpty());
- }
-
- @Test
- public void testInactivityMonitor() throws Exception {
-
-
brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false");
- brokerService.start();
-
- Thread t1 = new Thread() {
- StompConnection connection = new StompConnection();
-
- public void run() {
- try {
- connection.open("localhost",
brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
- connection.connect("system", "manager");
- } catch (Exception ex) {
- LOG.error("unexpected exception on connect/disconnect",
ex);
- exceptions.add(ex);
- }
- }
- };
-
- t1.run();
-
- assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 1 ==
brokerService.getTransportConnectors().get(0).connectionCount();
- }
- }));
-
- // and it should be closed due to inactivity
- assertTrue("no dangling connections", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 0 ==
brokerService.getTransportConnectors().get(0).connectionCount();
- }
- }));
- assertTrue("no exceptions", exceptions.isEmpty());
- }
+
}
\ No newline at end of file
Copied:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(from r1303689,
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Sun Mar 25 06:33:49 2012
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
@@ -41,7 +41,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
@@ -50,14 +49,18 @@ import org.apache.activemq.broker.jmx.Br
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.transport.stomp.SamplePojo;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StompTest extends CombinationTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
+public class MQTTTest extends CombinationTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
- protected String bindAddress = "stomp://localhost:61613";
+ protected String bindAddress = "mqtt://localhost:1883";
protected String confUri =
"xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";