Author: tabish
Date: Wed Mar 20 15:56:12 2013
New Revision: 1458900
URL: http://svn.apache.org/r1458900
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-4392
Unregister the connection on the Broker side so client's with the same client
ID can reconnect.
Modified:
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties
Modified:
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
---
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Wed Mar 20 15:56:12 2013
@@ -50,6 +50,7 @@ import org.apache.activemq.command.Remov
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@@ -105,7 +106,7 @@ class MQTTProtocolConverter {
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
- private ConnectionInfo connectionInfo = new ConnectionInfo();
+ private final ConnectionInfo connectionInfo = new ConnectionInfo();
private CONNECT connect;
private String clientId;
private long defaultKeepAlive;
@@ -159,7 +160,7 @@ class MQTTProtocolConverter {
}
case DISCONNECT.TYPE: {
LOG.debug("MQTT Client " + getClientId() + " disconnecting");
- stopTransport();
+ onMQTTDisconnect();
break;
}
case SUBSCRIBE.TYPE: {
@@ -232,6 +233,7 @@ class MQTTProtocolConverter {
connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
+ @Override
public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
if (response.isException()) {
@@ -250,6 +252,7 @@ class MQTTProtocolConverter {
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
+ @Override
public void onResponse(MQTTProtocolConverter converter,
Response response) throws IOException {
if (response.isException()) {
@@ -272,6 +275,14 @@ class MQTTProtocolConverter {
});
}
+ void onMQTTDisconnect() throws MQTTProtocolException {
+ if (connected.get()) {
+ sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
+ sendToActiveMQ(new ShutdownInfo(), null);
+ }
+ stopTransport();
+ }
+
void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
Topic[] topics = command.topics();
@@ -516,6 +527,7 @@ class MQTTProtocolConverter {
bytesOut.write(data, 0, read);
}
byteSequence = bytesOut.toByteSequence();
+ bytesOut.close();
}
result.payload(new Buffer(byteSequence.data,
byteSequence.offset, byteSequence.length));
}
@@ -555,7 +567,6 @@ class MQTTProtocolConverter {
return;
}
-
long keepAliveMS = keepAliveSeconds * 1000;
if (LOG.isDebugEnabled()) {
@@ -586,9 +597,6 @@ class MQTTProtocolConverter {
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
-
-
-
}
void handleException(Throwable exception, MQTTFrame command) {
@@ -636,6 +644,7 @@ class MQTTProtocolConverter {
switch (command.qos()) {
case AT_LEAST_ONCE:
return new ResponseHandler() {
+ @Override
public void onResponse(MQTTProtocolConverter
converter, Response response) throws IOException {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ",
command, ((ExceptionResponse) response).getException());
@@ -648,6 +657,7 @@ class MQTTProtocolConverter {
};
case EXACTLY_ONCE:
return new ResponseHandler() {
+ @Override
public void onResponse(MQTTProtocolConverter
converter, Response response) throws IOException {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ",
command, ((ExceptionResponse) response).getException());
Modified:
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
---
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
(original)
+++
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
Wed Mar 20 15:56:12 2013
@@ -21,6 +21,7 @@ import java.security.cert.X509Certificat
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
+
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
@@ -58,6 +59,7 @@ public class MQTTTransportFilter extends
}
}
+ @Override
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
@@ -67,6 +69,7 @@ public class MQTTTransportFilter extends
}
}
+ @Override
public void onCommand(Object command) {
try {
if (trace) {
@@ -81,6 +84,7 @@ public class MQTTTransportFilter extends
}
}
+ @Override
public void sendToActiveMQ(Command command) {
TransportListener l = transportListener;
if (l != null) {
@@ -88,6 +92,7 @@ public class MQTTTransportFilter extends
}
}
+ @Override
public void sendToMQTT(MQTTFrame command) throws IOException {
if( !stopped.get() ) {
if (trace) {
@@ -107,6 +112,7 @@ public class MQTTTransportFilter extends
}
}
+ @Override
public X509Certificate[] getPeerCertificates() {
if (next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport)
next).getPeerCertificates();
@@ -162,10 +168,7 @@ public class MQTTTransportFilter extends
* The default = 1
* @param activeMQSubscriptionPrefetch set the prefetch for the
corresponding ActiveMQ subscription
*/
-
public void setActiveMQSubscriptionPrefetch(int
activeMQSubscriptionPrefetch) {
protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
}
-
-
}
Modified:
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
---
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Wed Mar 20 15:56:12 2013
@@ -84,6 +84,28 @@ public class MQTTTest extends AbstractMQ
}));
}
+ @Test(timeout=300000)
+ public void testReuseConnection() throws Exception {
+ addMQTTConnector();
+ brokerService.start();
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("Test-Client");
+
+ {
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.disconnect();
+ Thread.sleep(1000);
+ }
+ {
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.disconnect();
+ Thread.sleep(1000);
+ }
+ }
+
@Override
protected String getProtocolScheme() {
return "mqtt";
Modified: activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties Wed Mar 20
15:56:12 2013
@@ -20,9 +20,9 @@
#
log4j.rootLogger=INFO, out, stdout
-#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE
+log4j.logger.org.apache.activemq.transport.mqtt=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG