This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 4484d05 ARTEMIS-2226 last consumer connection should close the
previous consumer connection
new ed05bbf This closes #2528
4484d05 is described below
commit 4484d05cf0d15c9b9388ea014b91d482b2f14a7b
Author: onlyMIT <[email protected]>
AuthorDate: Tue Jan 29 17:58:36 2019 +0800
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
Multiple consumers using the same clientId in the cluster, the last
consumer connection should close the previous consumer connection!
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
to address apache-rat-plugin:0.12:check
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
to address checkstyle
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer
connection
add javadoc
---
.../api/core/management/ManagementHelper.java | 4 +
.../core/protocol/mqtt/MQTTConnectionManager.java | 9 +-
.../core/protocol/mqtt/MQTTProtocolManager.java | 60 +-
.../protocol/mqtt/MQTTProtocolManagerFactory.java | 6 +-
.../artemis/core/protocol/mqtt/MQTTSession.java | 10 +-
.../management/impl/ActiveMQServerControlImpl.java | 12 +-
.../core/remoting/impl/AbstractAcceptor.java | 5 +
.../cluster/impl/ClusterConnectionBridge.java | 16 +-
.../server/cluster/impl/ClusterConnectionImpl.java | 17 +
.../core/server/impl/ServerSessionImpl.java | 17 +
.../impl/NotificationActiveMQServerPlugin.java | 46 -
.../integration/management/NotificationTest.java | 1 -
.../tests/integration/mqtt/imported/MQTTTest.java | 12 +-
.../integration/mqtt/imported/MQTTTestSupport.java | 18 +
.../imported/MqttClusterRemoteSubscribeTest.java | 1032 ++++++++++++++++++--
15 files changed, 1093 insertions(+), 172 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index bba8419..53cb087 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -84,6 +84,10 @@ public final class ManagementHelper {
public static final SimpleString HDR_MESSAGE_ID = new
SimpleString("_AMQ_Message_ID");
+ public static final SimpleString HDR_PROTOCOL_NAME = new
SimpleString("_AMQ_Protocol_Name");
+
+ public static final SimpleString HDR_CLIENT_ID = new
SimpleString("_AMQ_Client_ID");
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index bc511ea..8efea0a 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -162,14 +162,7 @@ public class MQTTConnectionManager {
}
private MQTTSessionState getSessionState(String clientId) {
- /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise
create a new one. */
- MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
- if (state == null) {
- state = new MQTTSessionState(clientId);
- MQTTSession.SESSIONS.put(clientId, state);
- }
-
- return state;
+ return session.getProtocolManager().getSessionState(clientId);
}
private String validateClientId(String clientId, boolean cleanSession) {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 71d30d8..6e91443 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -30,6 +30,9 @@ import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
@@ -40,11 +43,12 @@ import
org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
/**
* MQTTProtocolManager
*/
-class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,
MQTTInterceptor, MQTTConnection> implements NotificationListener {
+public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,
MQTTInterceptor, MQTTConnection> implements NotificationListener {
private static final List<String> websocketRegistryNames =
Arrays.asList("mqtt", "mqttv3.1");
@@ -55,18 +59,53 @@ class MQTTProtocolManager extends
AbstractProtocolManager<MqttMessage, MQTTInter
private final List<MQTTInterceptor> outgoingInterceptors = new
ArrayList<>();
//TODO Read in a list of existing client IDs from stored Sessions.
- private Map<String, MQTTConnection> connectedClients = new
ConcurrentHashMap<>();
+ private final Map<String, MQTTConnection> connectedClients;
+ private final Map<String, MQTTSessionState> sessionStates;
MQTTProtocolManager(ActiveMQServer server,
+ Map<String, MQTTConnection> connectedClients,
+ Map<String, MQTTSessionState> sessionStates,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
this.server = server;
+ this.connectedClients = connectedClients;
+ this.sessionStates = sessionStates;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+ server.getManagementService().addNotificationListener(this);
}
@Override
public void onNotification(Notification notification) {
- // TODO handle notifications
+ if (!(notification.getType() instanceof CoreNotificationType))
+ return;
+
+ CoreNotificationType type = (CoreNotificationType)
notification.getType();
+ if (type != CoreNotificationType.SESSION_CREATED)
+ return;
+
+ TypedProperties props = notification.getProperties();
+
+ SimpleString protocolName =
props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
+
+ //Only process SESSION_CREATED notifications for the MQTT protocol
+ if (protocolName == null ||
!protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
+ return;
+
+ int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
+
+ //distance > 0 means only processing notifications which are received
from other nodes in the cluster
+ if (distance > 0) {
+ String clientId =
props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
+ /*
+ * If there is a connection in the node with the same clientId as the
value of the "_AMQ_Client_ID" attribute
+ * in the SESSION_CREATED notification, you need to close this
connection.
+ * Avoid consumers with the same client ID in the cluster appearing
at different nodes at the same time.
+ */
+ MQTTConnection mqttConnection = connectedClients.get(clientId);
+ if (mqttConnection != null) {
+ mqttConnection.destroy();
+ }
+ }
}
@Override
@@ -201,4 +240,17 @@ class MQTTProtocolManager extends
AbstractProtocolManager<MqttMessage, MQTTInter
public MQTTConnection addConnectedClient(String clientId, MQTTConnection
connection) {
return connectedClients.put(clientId, connection);
}
+
+ public MQTTSessionState getSessionState(String clientId) {
+ /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise
create a new one. */
+ return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
+ }
+
+ public MQTTSessionState removeSessionState(String clientId) {
+ return sessionStates.remove(clientId);
+ }
+
+ public Map<String, MQTTSessionState> getSessionStates() {
+ return new HashMap<>(sessionStates);
+ }
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index 453b267..74a29e6 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -37,12 +38,15 @@ public class MQTTProtocolManagerFactory extends
AbstractProtocolManagerFactory<M
private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
+ private final Map<String, MQTTConnection> connectedClients = new
ConcurrentHashMap<>();
+ private final Map<String, MQTTSessionState> sessionStates = new
ConcurrentHashMap<>();
+
@Override
public ProtocolManager createProtocolManager(ActiveMQServer server,
final Map<String, Object>
parameters,
List<BaseInterceptor>
incomingInterceptors,
List<BaseInterceptor>
outgoingInterceptors) throws Exception {
- return BeanSupport.setData(new MQTTProtocolManager(server,
incomingInterceptors, outgoingInterceptors), parameters);
+ return BeanSupport.setData(new MQTTProtocolManager(server,
connectedClients, sessionStates, incomingInterceptors, outgoingInterceptors),
parameters);
}
@Override
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 640b893..b788f36 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -17,10 +17,7 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
@@ -30,8 +27,6 @@ import
org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class MQTTSession {
- static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>();
-
private final String id = UUID.randomUUID().toString();
private MQTTProtocolHandler protocolHandler;
@@ -108,7 +103,7 @@ public class MQTTSession {
if (isClean()) {
clean();
- SESSIONS.remove(connection.getClientID());
+ protocolManager.removeSessionState(connection.getClientID());
}
}
stopped = true;
@@ -201,7 +196,4 @@ public class MQTTSession {
return coreMessageObjectPools;
}
- public static Map<String, MQTTSessionState> getSessions() {
- return new HashMap<>(SESSIONS);
- }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 318d880..26b4eca 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -59,6 +59,7 @@ import
org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.DivertControl;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.client.impl.Topology;
@@ -2967,7 +2968,16 @@ public class ActiveMQServerControlImpl extends
AbstractControl implements Active
if (!(notification.getType() instanceof CoreNotificationType))
return;
CoreNotificationType type = (CoreNotificationType)
notification.getType();
- TypedProperties prop = notification.getProperties();
+ if (type == CoreNotificationType.SESSION_CREATED) {
+ TypedProperties props = notification.getProperties();
+ /*
+ * If the SESSION_CREATED notification is received from another node
in the cluster, no broadcast call is made.
+ * To keep the original logic to avoid calling the broadcast multiple
times for the same SESSION_CREATED notification in the cluster.
+ */
+ if (props.getIntProperty(ManagementHelper.HDR_DISTANCE) > 0) {
+ return;
+ }
+ }
this.broadcaster.sendNotification(new Notification(type.toString(),
this, notifSeq.incrementAndGet(), notification.toString()));
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
index a2f30f3..1aa1dff 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.remoting.impl;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -43,4 +44,8 @@ public abstract class AbstractAcceptor implements Acceptor {
}
}
+ public Map<String, ProtocolManager> getProtocolMap() {
+ return Collections.unmodifiableMap(protocolMap);
+ }
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index a9d80e5..f7e2817 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -233,6 +233,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
" AND " +
ManagementHelper.HDR_NOTIFICATION_TYPE +
" IN ('" +
+
CoreNotificationType.SESSION_CREATED +
+ "','" +
CoreNotificationType.BINDING_ADDED +
"','" +
CoreNotificationType.BINDING_REMOVED +
@@ -252,6 +254,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
flowRecord.getMaxHops() +
" AND (" +
createSelectorFromAddress(appendIgnoresToFilter(flowRecord.getAddress())) +
+ ") AND (" +
+
createPermissiveManagementNotificationToFilter() +
")");
sessionConsumer.createTemporaryQueue(managementNotificationAddress,
notifQueueName, filter);
@@ -351,10 +355,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
filterString += "!" + storeAndForwardPrefix;
filterString += ",!" + managementAddress;
- filterString += ",!" + managementNotificationAddress;
return filterString;
}
+ /**
+ * Create a filter rule,in addition to SESSION_CREATED notifications, all
other notifications using managementNotificationAddress
+ * as the routing address will be filtered.
+ * @return
+ */
+ private String createPermissiveManagementNotificationToFilter() {
+ StringBuilder filterBuilder = new
StringBuilder(ManagementHelper.HDR_NOTIFICATION_TYPE).append(" = '")
+ .append(CoreNotificationType.SESSION_CREATED).append("' OR
(").append(ManagementHelper.HDR_ADDRESS)
+ .append(" NOT LIKE
'").append(managementNotificationAddress).append("%')");
+ return filterBuilder.toString();
+ }
@Override
protected void nodeUP(TopologyMember member, boolean last) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 4b884b5..aa68b81 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -1078,6 +1078,10 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
doUnProposalReceived(message);
break;
}
+ case SESSION_CREATED: {
+ doSessionCreated(message);
+ break;
+ }
default: {
throw ActiveMQMessageBundle.BUNDLE.invalidType(ntype);
}
@@ -1303,6 +1307,19 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
binding.disconnect();
}
+ private synchronized void doSessionCreated(final ClientMessage message)
throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace(ClusterConnectionImpl.this + " session created " +
message);
+ }
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME,
message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
+ props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS,
message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID,
message.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID));
+ props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME,
message.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME));
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE,
message.getIntProperty(ManagementHelper.HDR_DISTANCE) + 1);
+ managementService.sendNotification(new Notification(null,
CoreNotificationType.SESSION_CREATED, props));
+ }
+
private synchronized void doConsumerCreated(final ClientMessage message)
throws Exception {
if (logger.isTraceEnabled()) {
logger.trace(ClusterConnectionImpl.this + " Consumer created " +
message);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 6464eee..04322df 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -272,6 +272,8 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
if (!xa) {
tx = newTransaction();
}
+ //When the ServerSessionImpl initialization is complete, need to create
and send a SESSION_CREATED notification.
+ sendSessionNotification(CoreNotificationType.SESSION_CREATED);
}
// ServerSession implementation
---------------------------------------------------------------------------
@@ -422,6 +424,8 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
closed = true;
+ //When the ServerSessionImpl is closed, need to create and send a
SESSION_CLOSED notification.
+ sendSessionNotification(CoreNotificationType.SESSION_CLOSED);
if (server.hasBrokerSessionPlugins()) {
server.callBrokerSessionPlugins(plugin ->
plugin.afterCloseSession(this, failed));
@@ -429,6 +433,19 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
}
+ private void sendSessionNotification(final CoreNotificationType type)
throws Exception {
+ final TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME,
SimpleString.toSimpleString(this.getConnectionID().toString()));
+ props.putSimpleStringProperty(ManagementHelper.HDR_USER,
SimpleString.toSimpleString(this.getUsername()));
+ props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME,
SimpleString.toSimpleString(this.getName()));
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID,
SimpleString.toSimpleString(this.remotingConnection.getClientID()));
+ props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME,
SimpleString.toSimpleString(this.remotingConnection.getProtocolName()));
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS,
managementService.getManagementNotificationAddress());
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+ managementService.sendNotification(new Notification(null, type, props));
+ }
+
private void securityCheck(SimpleString address, CheckType checkType,
SecurityAuth auth) throws Exception {
if (securityEnabled) {
securityStore.check(address, checkType, auth);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
index 29846aa..880f970 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
@@ -26,7 +26,6 @@ import
org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
@@ -43,13 +42,11 @@ public class NotificationActiveMQServerPlugin implements
ActiveMQServerPlugin {
private static final Logger logger =
Logger.getLogger(NotificationActiveMQServerPlugin.class);
public static final String SEND_CONNECTION_NOTIFICATIONS =
"SEND_CONNECTION_NOTIFICATIONS";
- public static final String SEND_SESSION_NOTIFICATIONS =
"SEND_SESSION_NOTIFICATIONS";
public static final String SEND_ADDRESS_NOTIFICATIONS =
"SEND_ADDRESS_NOTIFICATIONS";
public static final String SEND_DELIVERED_NOTIFICATIONS =
"SEND_DELIVERED_NOTIFICATIONS";
public static final String SEND_EXPIRED_NOTIFICATIONS =
"SEND_EXPIRED_NOTIFICATIONS";
private boolean sendConnectionNotifications;
- private boolean sendSessionNotifications;
private boolean sendAddressNotifications;
private boolean sendDeliveredNotifications;
private boolean sendExpiredNotifications;
@@ -66,8 +63,6 @@ public class NotificationActiveMQServerPlugin implements
ActiveMQServerPlugin {
public void init(Map<String, String> properties) {
sendConnectionNotifications =
Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS,
Boolean.FALSE.toString()));
- sendSessionNotifications =
Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS,
- Boolean.FALSE.toString()));
sendAddressNotifications =
Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS,
Boolean.FALSE.toString()));
sendDeliveredNotifications =
Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS,
@@ -97,16 +92,6 @@ public class NotificationActiveMQServerPlugin implements
ActiveMQServerPlugin {
}
@Override
- public void afterCreateSession(ServerSession session) throws
ActiveMQException {
- sendSessionNotification(session, CoreNotificationType.SESSION_CREATED);
- }
-
- @Override
- public void afterCloseSession(ServerSession session, boolean failed) throws
ActiveMQException {
- sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED);
- }
-
- @Override
public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws
ActiveMQException {
sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED);
}
@@ -196,23 +181,6 @@ public class NotificationActiveMQServerPlugin implements
ActiveMQServerPlugin {
}
}
- private void sendSessionNotification(final ServerSession session, final
CoreNotificationType type) {
- final ManagementService managementService = getManagementService();
-
- if (managementService != null && sendSessionNotifications) {
- try {
- final TypedProperties props = new TypedProperties();
-
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME,
SimpleString.toSimpleString(session.getConnectionID().toString()));
- props.putSimpleStringProperty(ManagementHelper.HDR_USER,
SimpleString.toSimpleString(session.getUsername()));
- props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME,
SimpleString.toSimpleString(session.getName()));
-
- managementService.sendNotification(new Notification(null, type,
props));
- } catch (Exception e) {
- logger.warn("Error sending notification: " + type, e.getMessage(),
e);
- }
- }
- }
-
/**
* @return the sendConnectionNotifications
*/
@@ -228,20 +196,6 @@ public class NotificationActiveMQServerPlugin implements
ActiveMQServerPlugin {
}
/**
- * @return the sendSessionNotifications
- */
- public boolean isSendSessionNotifications() {
- return sendSessionNotifications;
- }
-
- /**
- * @param sendSessionNotifications the sendSessionNotifications to set
- */
- public void setSendSessionNotifications(boolean sendSessionNotifications) {
- this.sendSessionNotifications = sendSessionNotifications;
- }
-
- /**
* @return the sendDeliveredNotifications
*/
public boolean isSendDeliveredNotifications() {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
index ed5713e..196e939 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
@@ -362,7 +362,6 @@ public class NotificationTest extends ActiveMQTestBase {
NotificationActiveMQServerPlugin notificationPlugin = new
NotificationActiveMQServerPlugin();
notificationPlugin.setSendAddressNotifications(true);
notificationPlugin.setSendConnectionNotifications(true);
- notificationPlugin.setSendSessionNotifications(true);
notificationPlugin.setSendDeliveredNotifications(true);
notificationPlugin.setSendExpiredNotifications(true);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 03bcddd..5b35f33 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1120,7 +1120,7 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
- assertEquals(1, MQTTSession.getSessions().size());
+ assertEquals(1, getSessions().size());
// MUST receive message from existing subscription from previous not
clean session
notClean = mqttNotClean.blockingConnection();
@@ -1132,7 +1132,7 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
- assertEquals(1, MQTTSession.getSessions().size());
+ assertEquals(1, getSessions().size());
// MUST NOT receive message from previous not clean session as existing
subscription should be gone
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
@@ -1144,7 +1144,7 @@ public class MQTTTest extends MQTTTestSupport {
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect();
- assertEquals(0, MQTTSession.getSessions().size());
+ assertEquals(0, getSessions().size());
// MUST NOT receive message from previous clean session as existing
subscription should be gone
notClean = mqttNotClean.blockingConnection();
@@ -1153,7 +1153,7 @@ public class MQTTTest extends MQTTTestSupport {
assertNull(msg);
notClean.disconnect();
- assertEquals(1, MQTTSession.getSessions().size());
+ assertEquals(1, getSessions().size());
}
@Test(timeout = 60 * 1000)
@@ -1167,7 +1167,7 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
- assertEquals(1, MQTTSession.getSessions().size());
+ assertEquals(1, getSessions().size());
// MUST NOT receive message from previous not clean session even when
creating a new subscription
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
@@ -1179,7 +1179,7 @@ public class MQTTTest extends MQTTTestSupport {
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect();
- assertEquals(0, MQTTSession.getSessions().size());
+ assertEquals(0, getSessions().size());
}
@Test(timeout = 60 * 1000)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index e49ec92..83871e3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -28,6 +28,7 @@ import java.security.ProtectionDomain;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -42,13 +43,18 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.mqtt.client.MQTT;
@@ -366,6 +372,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return mqtt;
}
+ public Map<String, MQTTSessionState> getSessions() {
+ Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
+ if (acceptor instanceof AbstractAcceptor) {
+ ProtocolManager protocolManager = ((AbstractAcceptor)
acceptor).getProtocolMap().get("MQTT");
+ if (protocolManager instanceof MQTTProtocolManager) {
+ return ((MQTTProtocolManager) protocolManager).getSessionStates();
+ }
+
+ }
+ return Collections.emptyMap();
+ }
+
private MQTT createMQTTSslConnection(String clientId, boolean clean) throws
Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
index 8caba17..19360b1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
@@ -24,6 +24,7 @@ import
org.apache.activemq.artemis.core.config.WildcardConfiguration;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
@@ -43,29 +44,127 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
}
@Test
- public void unsubscribeRemoteQueue() throws Exception {
- final String TOPIC = "test/1/some/la";
+ public void useSameClientIdAndAnycastSubscribeRemoteQueue() throws
Exception {
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String subClientId = "subClientId";
+ final String pubClientId = "pubClientId";
- setupServers(TOPIC);
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection subConnection1 = null;
+ BlockingConnection subConnection2 = null;
+ BlockingConnection pubConnection = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+ subConnection1 = retrieveMQTTConnection("tcp://localhost:61616",
subClientId);
+ subConnection2 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+ pubConnection = retrieveMQTTConnection("tcp://localhost:61616",
pubClientId);
+
+ //Waiting for the first sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ subConnection2.subscribe(topics);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+ Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message3.ack();
+
+ assertEquals(payload1, new String(message1.getPayload()));
+ assertEquals(payload2, new String(message2.getPayload()));
+ assertEquals(payload3, new String(message3.getPayload()));
+
+ subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message11);
+ Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message21);
+ Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message31);
+
+ } finally {
+ String[] topics = new String[]{ANYCAST_TOPIC};
+ if (subConnection1 != null && subConnection1.isConnected()) {
+ subConnection1.unsubscribe(topics);
+ subConnection1.disconnect();
+ }
+ if (subConnection2 != null && subConnection2.isConnected()) {
+ subConnection2.unsubscribe(topics);
+ subConnection2.disconnect();
+ }
+ if (pubConnection != null && pubConnection.isConnected()) {
+ pubConnection.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useDiffClientIdAndAnycastSubscribeRemoteQueue() throws
Exception {
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
+
+ setupServers(ANYCAST_TOPIC);
startServers(0, 1);
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
try {
-
- connection1 = retrieveMQTTConnection("tcp://localhost:61616");
- connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616",
clientId1);
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617",
clientId2);
// Subscribe to topics
- Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
connection1.subscribe(topics);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
connection2.subscribe(topics);
- waitForBindings(0, TOPIC, 1, 1, true);
- waitForBindings(1, TOPIC, 1, 1, true);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
- waitForBindings(0, TOPIC, 1, 1, false);
- waitForBindings(1, TOPIC, 1, 1, false);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
// Publish Messages
@@ -73,9 +172,9 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
String payload2 = "This is message 2";
String payload3 = "This is message 3";
- connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
- connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
+ connection1.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
@@ -89,12 +188,17 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));
+ connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
- connection2.unsubscribe(new String[]{TOPIC});
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
- connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
- connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
+ connection1.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
Message message11 = connection1.receive(5, TimeUnit.SECONDS);
message11.ack();
@@ -103,7 +207,6 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
Message message31 = connection1.receive(5, TimeUnit.SECONDS);
message31.ack();
-
String message11String = new String(message11.getPayload());
String message21String = new String(message21.getPayload());
String message31String = new String(message31.getPayload());
@@ -111,14 +214,13 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
assertTrue(payload2.equals(message11String) ||
payload2.equals(message21String) || payload2.equals(message31String) );
assertTrue(payload3.equals(message11String) ||
payload3.equals(message21String) || payload3.equals(message31String) );
-
} finally {
- String[] topics = new String[]{TOPIC};
- if (connection1 != null) {
+ String[] topics = new String[]{ANYCAST_TOPIC};
+ if (connection1 != null && connection1.isConnected()) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
- if (connection2 != null) {
+ if (connection2 != null && connection2.isConnected()) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
@@ -127,29 +229,320 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
}
@Test
- public void unsubscribeRemoteQueueWildCard() throws Exception {
- final String TOPIC = "test/+/some/#";
+ public void useSameClientIdAndMulticastSubscribeRemoteQueue() throws
Exception {
+ final String MULTICAST_TOPIC = "multicast/test/1/some/la";
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String subClientId = "subClientId";
+ final String pubClientId = "pubClientId";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection subConnection1 = null;
+ BlockingConnection subConnection2 = null;
+ BlockingConnection pubConnection = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+ subConnection1 = retrieveMQTTConnection("tcp://localhost:61616",
subClientId);
+ subConnection2 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+ pubConnection = retrieveMQTTConnection("tcp://localhost:61616",
pubClientId);
+
+ //Waiting for the first sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ subConnection2.subscribe(topics);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+ Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message3.ack();
+
+ assertEquals(payload1, new String(message1.getPayload()));
+ assertEquals(payload2, new String(message2.getPayload()));
+ assertEquals(payload3, new String(message3.getPayload()));
+
+ subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+ pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message11);
+ Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message21);
+ Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message31);
+
+ } finally {
+ String[] topics = new String[]{MULTICAST_TOPIC};
+ if (subConnection1 != null && subConnection1.isConnected()) {
+ subConnection1.unsubscribe(topics);
+ subConnection1.disconnect();
+ }
+ if (subConnection2 != null && subConnection2.isConnected()) {
+ subConnection2.unsubscribe(topics);
+ subConnection2.disconnect();
+ }
+ if (pubConnection != null && pubConnection.isConnected()) {
+ pubConnection.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useDiffClientIdAndMulticastSubscribeRemoteQueue() throws
Exception {
+ final String MULTICAST_TOPIC = "multicast/test/1/some/la";
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
- setupServers(TOPIC);
+ setupServers(ANYCAST_TOPIC);
startServers(0, 1);
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616",
clientId1);
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617",
clientId2);
+ // Subscribe to topics
+ connection1.subscribe(topics);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+ connection2.subscribe(topics);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ connection1.publish(MULTICAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(MULTICAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(MULTICAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message11 = connection1.receive(5, TimeUnit.SECONDS);
+ message11.ack();
+ Message message12 = connection1.receive(5, TimeUnit.SECONDS);
+ message12.ack();
+ Message message13 = connection1.receive(5, TimeUnit.SECONDS);
+ message13.ack();
+
+ assertEquals(payload1, new String(message11.getPayload()));
+ assertEquals(payload2, new String(message12.getPayload()));
+ assertEquals(payload3, new String(message13.getPayload()));
+
+ Message message21 = connection2.receive(5, TimeUnit.SECONDS);
+ message21.ack();
+ Message message22 = connection2.receive(5, TimeUnit.SECONDS);
+ message22.ack();
+ Message message23 = connection2.receive(5, TimeUnit.SECONDS);
+ message23.ack();
+
+ assertEquals(payload1, new String(message21.getPayload()));
+ assertEquals(payload2, new String(message22.getPayload()));
+ assertEquals(payload3, new String(message23.getPayload()));
+
+ connection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+ connection1.publish(MULTICAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(MULTICAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(MULTICAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message31 = connection1.receive(5, TimeUnit.SECONDS);
+ message31.ack();
+ Message message32 = connection1.receive(5, TimeUnit.SECONDS);
+ message32.ack();
+ Message message33 = connection1.receive(5, TimeUnit.SECONDS);
+ message33.ack();
+
+ assertEquals(payload1, new String(message31.getPayload()));
+ assertEquals(payload2, new String(message32.getPayload()));
+ assertEquals(payload3, new String(message33.getPayload()));
+
+ } finally {
+ String[] topics = new String[]{MULTICAST_TOPIC};
+ if (connection1 != null && connection1.isConnected()) {
+ connection1.unsubscribe(topics);
+ connection1.disconnect();
+ }
+ if (connection2 != null && connection2.isConnected()) {
+ connection2.unsubscribe(topics);
+ connection2.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws
Exception {
+ final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+ final String subClientId = "subClientId";
+ final String pubClientId = "pubClientId";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection subConnection1 = null;
+ BlockingConnection subConnection2 = null;
+ BlockingConnection pubConnection = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+ subConnection1 = retrieveMQTTConnection("tcp://localhost:61616",
subClientId);
+ subConnection2 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+ pubConnection = retrieveMQTTConnection("tcp://localhost:61616",
pubClientId);
+
+ //Waiting for the first sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ subConnection2.subscribe(topics);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+ Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message3.ack();
+
+ assertEquals(payload1, new String(message1.getPayload()));
+ assertEquals(payload2, new String(message2.getPayload()));
+ assertEquals(payload3, new String(message3.getPayload()));
+
+ subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message11);
+ Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message21);
+ Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message31);
+
+ } finally {
+ String[] topics = new String[]{ANYCAST_TOPIC};
+ if (subConnection1 != null && subConnection1.isConnected()) {
+ subConnection1.unsubscribe(topics);
+ subConnection1.disconnect();
+ }
+ if (subConnection2 != null && subConnection2.isConnected()) {
+ subConnection2.unsubscribe(topics);
+ subConnection2.disconnect();
+ }
+ if (pubConnection != null && pubConnection.isConnected()) {
+ pubConnection.disconnect();
+ }
+ }
+
+ }
- connection1 = retrieveMQTTConnection("tcp://localhost:61616");
- connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+ @Test
+ public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws
Exception {
+ final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection connection1 = null;
+ BlockingConnection connection2 = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616",
clientId1);
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617",
clientId2);
// Subscribe to topics
- Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
connection1.subscribe(topics);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
connection2.subscribe(topics);
- waitForBindings(0, TOPIC, 1, 1, true);
- waitForBindings(1, TOPIC, 1, 1, true);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
- waitForBindings(0, TOPIC, 1, 1, false);
- waitForBindings(1, TOPIC, 1, 1, false);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
// Publish Messages
@@ -157,9 +550,9 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
String payload2 = "This is message 2";
String payload3 = "This is message 3";
- connection1.publish("test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
- connection1.publish("test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
- connection1.publish("test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish("anycast/test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
@@ -174,11 +567,17 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
assertEquals(payload3, new String(message3.getPayload()));
- connection2.unsubscribe(new String[]{TOPIC});
+ connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
- connection1.publish("test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
- connection1.publish("test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
- connection1.publish("test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
+ connection1.publish("anycast/test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
Message message11 = connection1.receive(5, TimeUnit.SECONDS);
message11.ack();
@@ -187,22 +586,215 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
Message message31 = connection1.receive(5, TimeUnit.SECONDS);
message31.ack();
+
String message11String = new String(message11.getPayload());
String message21String = new String(message21.getPayload());
String message31String = new String(message31.getPayload());
+ assertTrue(payload1.equals(message11String) ||
payload1.equals(message21String) || payload1.equals(message31String) );
+ assertTrue(payload2.equals(message11String) ||
payload2.equals(message21String) || payload2.equals(message31String) );
+ assertTrue(payload3.equals(message11String) ||
payload3.equals(message21String) || payload3.equals(message31String) );
- assertTrue(payload1.equals(message11String) ||
payload1.equals(message21String) || payload1.equals(message31String));
- assertTrue(payload2.equals(message11String) ||
payload2.equals(message21String) || payload2.equals(message31String));
- assertTrue(payload3.equals(message11String) ||
payload3.equals(message21String) || payload3.equals(message31String));
+ } finally {
+ String[] topics = new String[]{ANYCAST_TOPIC};
+ if (connection1 != null && connection1.isConnected()) {
+ connection1.unsubscribe(topics);
+ connection1.disconnect();
+ }
+ if (connection2 != null && connection2.isConnected()) {
+ connection2.unsubscribe(topics);
+ connection2.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard()
throws Exception {
+ final String MULTICAST_TOPIC = "multicast/test/+/some/#";
+ final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+ final String subClientId = "subClientId";
+ final String pubClientId = "pubClientId";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection subConnection1 = null;
+ BlockingConnection subConnection2 = null;
+ BlockingConnection pubConnection = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+ subConnection1 = retrieveMQTTConnection("tcp://localhost:61616",
subClientId);
+ subConnection2 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+ pubConnection = retrieveMQTTConnection("tcp://localhost:61616",
pubClientId);
+
+ //Waiting for the first sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ subConnection2.subscribe(topics);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ pubConnection.publish("multicast/test/1/some/la",
payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish("multicast/test/1/some/la",
payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+ pubConnection.publish("multicast/test/1/some/la",
payload3.getBytes(), QoS.AT_MOST_ONCE, false);
+
+ Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+ Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message3.ack();
+
+ assertEquals(payload1, new String(message1.getPayload()));
+ assertEquals(payload2, new String(message2.getPayload()));
+ assertEquals(payload3, new String(message3.getPayload()));
+
+ subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+ pubConnection.publish("multicast/test/1/some/la",
payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish("multicast/test/1/some/la",
payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+ pubConnection.publish("multicast/test/1/some/la",
payload3.getBytes(), QoS.AT_MOST_ONCE, false);
+
+ Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message11);
+ Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message21);
+ Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message31);
+
+ } finally {
+ String[] topics = new String[]{MULTICAST_TOPIC};
+ if (subConnection1 != null && subConnection1.isConnected()) {
+ subConnection1.unsubscribe(topics);
+ subConnection1.disconnect();
+ }
+ if (subConnection2 != null && subConnection2.isConnected()) {
+ subConnection2.unsubscribe(topics);
+ subConnection2.disconnect();
+ }
+ if (pubConnection != null && pubConnection.isConnected()) {
+ pubConnection.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useDiffClientIdAndMulticastSubscribeRemoteQueueWildCard()
throws Exception {
+ final String MULTICAST_TOPIC = "multicast/test/+/some/#";
+ final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection connection1 = null;
+ BlockingConnection connection2 = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616",
clientId1);
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617",
clientId2);
+ // Subscribe to topics
+ connection1.subscribe(topics);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+ connection2.subscribe(topics);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+
+ connection1.publish("multicast/test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish("multicast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish("multicast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message11 = connection1.receive(5, TimeUnit.SECONDS);
+ message11.ack();
+ Message message12 = connection1.receive(5, TimeUnit.SECONDS);
+ message12.ack();
+ Message message13 = connection1.receive(5, TimeUnit.SECONDS);
+ message13.ack();
+
+ assertEquals(payload1, new String(message11.getPayload()));
+ assertEquals(payload2, new String(message12.getPayload()));
+ assertEquals(payload3, new String(message13.getPayload()));
+
+ Message message21 = connection2.receive(5, TimeUnit.SECONDS);
+ message21.ack();
+ Message message22 = connection2.receive(5, TimeUnit.SECONDS);
+ message22.ack();
+ Message message23 = connection2.receive(5, TimeUnit.SECONDS);
+ message23.ack();
+
+ assertEquals(payload1, new String(message21.getPayload()));
+ assertEquals(payload2, new String(message22.getPayload()));
+ assertEquals(payload3, new String(message23.getPayload()));
+
+ connection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+ waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+ waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+ waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+ waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+ connection1.publish("multicast/test/1/some/la", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish("multicast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish("multicast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message31 = connection1.receive(5, TimeUnit.SECONDS);
+ message31.ack();
+ Message message32 = connection1.receive(5, TimeUnit.SECONDS);
+ message32.ack();
+ Message message33 = connection1.receive(5, TimeUnit.SECONDS);
+ message33.ack();
+
+ assertEquals(payload1, new String(message31.getPayload()));
+ assertEquals(payload2, new String(message32.getPayload()));
+ assertEquals(payload3, new String(message33.getPayload()));
} finally {
- String[] topics = new String[]{TOPIC};
- if (connection1 != null) {
+ String[] topics = new String[]{MULTICAST_TOPIC};
+ if (connection1 != null && connection1.isConnected()) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
- if (connection2 != null) {
+ if (connection2 != null && connection2.isConnected()) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
@@ -211,31 +803,39 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
}
@Test
- public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception {
- final String TOPIC = "test/1/some/la";
+ public void useDiffClientIdSubscribeRemoteQueueMultipleSubscriptions()
throws Exception {
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
final String TOPIC2 = "sample";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
- setupServers(TOPIC);
+ setupServers(ANYCAST_TOPIC);
startServers(0, 1);
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
try {
-
- connection1 = retrieveMQTTConnection("tcp://localhost:61616");
- connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616",
clientId1);
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617",
clientId2);
// Subscribe to topics
- connection1.subscribe(new Topic[]{new Topic(TOPIC,
QoS.AT_MOST_ONCE)});
- connection2.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE),
new Topic(TOPIC2, QoS.AT_MOST_ONCE)});
+ connection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC,
QoS.AT_MOST_ONCE)});
- waitForBindings(0, TOPIC, 1, 1, true);
- waitForBindings(1, TOPIC, 1, 1, true);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
- waitForBindings(0, TOPIC, 1, 1, false);
- waitForBindings(1, TOPIC, 1, 1, false);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+ connection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC,
QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)});
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
// Publish Messages
String payload1 = "This is message 1";
@@ -243,9 +843,9 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
String payload3 = "This is message 3";
String payload4 = "This is message 4";
- connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
- connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
+ connection1.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE,
false);
@@ -263,11 +863,17 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
assertEquals(payload3, new String(message3.getPayload()));
assertEquals(payload4, new String(message4.getPayload()));
- connection2.unsubscribe(new String[]{TOPIC});
+ connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
- connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
- connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
+ connection1.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE,
false);
Message message11 = connection1.receive(5, TimeUnit.SECONDS);
@@ -289,12 +895,12 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
} finally {
- if (connection1 != null) {
- connection1.unsubscribe(new String[]{TOPIC});
+ if (connection1 != null && connection1.isConnected()) {
+ connection1.unsubscribe(new String[]{ANYCAST_TOPIC});
connection1.disconnect();
}
- if (connection2 != null) {
- connection2.unsubscribe(new String[]{TOPIC, TOPIC2});
+ if (connection2 != null && connection2.isConnected()) {
+ connection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2});
connection2.disconnect();
}
}
@@ -302,33 +908,258 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
}
@Test
- public void unsubscribeExistingQueue() throws Exception {
- final String TOPIC = "test/1/some/la";
+ public void useSameClientIdSubscribeRemoteQueueMultipleSubscriptions()
throws Exception {
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String TOPIC2 = "sample";
+ final String subClientId = "subClientId";
+ final String pubClientId = "pubClientId";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection subConnection1 = null;
+ BlockingConnection subConnection2 = null;
+ BlockingConnection pubConnection = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ subConnection1 = retrieveMQTTConnection("tcp://localhost:61616",
subClientId);
+ subConnection2 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+ pubConnection = retrieveMQTTConnection("tcp://localhost:61616",
pubClientId);
+
+ //Waiting for the first sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ subConnection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC,
QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+ String payload4 = "This is message 4";
+
+ pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE,
false);
+
+ Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+ Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message3.ack();
+ Message message4 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message4.ack();
+
+ String messageStr1 = new String(message1.getPayload());
+ String messageStr2 = new String(message2.getPayload());
+ String messageStr3 = new String(message3.getPayload());
+ String messageStr4 = new String(message4.getPayload());
+ assertTrue(payload1.equals(messageStr1) ||
payload1.equals(messageStr2) || payload1.equals(messageStr3) ||
payload1.equals(messageStr4));
+ assertTrue(payload2.equals(messageStr1) ||
payload2.equals(messageStr2) || payload2.equals(messageStr3) ||
payload2.equals(messageStr4));
+ assertTrue(payload3.equals(messageStr1) ||
payload3.equals(messageStr2) || payload3.equals(messageStr3) ||
payload3.equals(messageStr4));
+ assertTrue(payload4.equals(messageStr1) ||
payload4.equals(messageStr2) || payload4.equals(messageStr3) ||
payload4.equals(messageStr4));
+
+ subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE,
false);
+
+ Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+ message11.ack();
+ assertEquals(payload4, new String(message11.getPayload()));
+
+ Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message21);
+ Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message31);
+ Message message41 = subConnection2.receive(5, TimeUnit.SECONDS);
+ assertNull(message41);
+
+ } finally {
+ if (subConnection1 != null && subConnection1.isConnected()) {
+ subConnection1.unsubscribe(new String[]{ANYCAST_TOPIC});
+ subConnection1.disconnect();
+ }
+ if (subConnection2 != null && subConnection2.isConnected()) {
+ subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2});
+ subConnection2.disconnect();
+ }
+ if (pubConnection != null && pubConnection.isConnected()) {
+ pubConnection.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useSameClientIdSubscribeExistingQueue() throws Exception {
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String subClientId = "subClientId";
+ final String pubClientId = "pubClientId";
+
+ setupServers(ANYCAST_TOPIC);
+
+ startServers(0, 1);
+
+ BlockingConnection subConnection1 = null;
+ BlockingConnection subConnection2 = null;
+ BlockingConnection subConnection3 = null;
+ BlockingConnection pubConnection = null;
+ try {
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ pubConnection = retrieveMQTTConnection("tcp://localhost:61616",
pubClientId);
+
+ subConnection1 = retrieveMQTTConnection("tcp://localhost:61616",
subClientId);
+ subConnection2 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+
+ //Waiting for the first sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ subConnection3 = retrieveMQTTConnection("tcp://localhost:61617",
subClientId);
+
+ //Waiting for the second sub connection be closed
+ assertTrue(waitConnectionClosed(subConnection1));
+
+ // Subscribe to topics
+ Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+
+ subConnection3.subscribe(topics);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+ String payload3 = "This is message 3";
+ String payload4 = "This is message 4";
+
+ pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload4.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message1 = subConnection3.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = subConnection3.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+ Message message3 = subConnection3.receive(5, TimeUnit.SECONDS);
+ message3.ack();
+ Message message4 = subConnection3.receive(5, TimeUnit.SECONDS);
+ message4.ack();
+
+ assertEquals(payload1, new String(message1.getPayload()));
+ assertEquals(payload2, new String(message2.getPayload()));
+ assertEquals(payload3, new String(message3.getPayload()));
+ assertEquals(payload4, new String(message4.getPayload()));
+
+ subConnection3.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+ pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+
+ Message message11 = subConnection3.receive(5, TimeUnit.SECONDS);
+ assertNull(message11);
+ Message message21 = subConnection3.receive(5, TimeUnit.SECONDS);
+ assertNull(message21);
+ Message message31 = subConnection3.receive(5, TimeUnit.SECONDS);
+ assertNull(message31);
+
+
+ } finally {
+ String[] topics = new String[]{ANYCAST_TOPIC};
+ if (subConnection1 != null && subConnection1.isConnected()) {
+ subConnection1.unsubscribe(topics);
+ subConnection1.disconnect();
+ }
+ if (subConnection2 != null && subConnection2.isConnected()) {
+ subConnection2.unsubscribe(topics);
+ subConnection2.disconnect();
+ }
+ if (subConnection3 != null && subConnection3.isConnected()) {
+ subConnection3.unsubscribe(topics);
+ subConnection3.disconnect();
+ }
+ if (pubConnection != null && pubConnection.isConnected()) {
+ pubConnection.unsubscribe(topics);
+ pubConnection.disconnect();
+ }
+ }
+
+ }
+
+ @Test
+ public void useDiffClientIdSubscribeExistingQueue() throws Exception {
+ final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
+ final String clientId3 = "clientId3";
- setupServers(TOPIC);
+ setupServers(ANYCAST_TOPIC);
startServers(0, 1);
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
BlockingConnection connection3 = null;
try {
-
- connection1 = retrieveMQTTConnection("tcp://localhost:61616");
- connection2 = retrieveMQTTConnection("tcp://localhost:61617");
- connection3 = retrieveMQTTConnection("tcp://localhost:61617");
+ //Waiting for resource initialization to complete
+ Thread.sleep(5000);
+ connection1 = retrieveMQTTConnection("tcp://localhost:61616",
clientId1);
+ connection2 = retrieveMQTTConnection("tcp://localhost:61617",
clientId2);
+ connection3 = retrieveMQTTConnection("tcp://localhost:61617",
clientId3);
// Subscribe to topics
- Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
+ Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1.subscribe(topics);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
connection2.subscribe(topics);
- connection3.subscribe(topics);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
- waitForBindings(0, TOPIC, 1, 1, true);
- waitForBindings(1, TOPIC, 1, 2, true);
+ connection3.subscribe(topics);
- waitForBindings(0, TOPIC, 1, 2, false);
- waitForBindings(1, TOPIC, 1, 1, false);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 2, true);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 2, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
// Publish Messages
String payload1 = "This is message 1";
@@ -336,10 +1167,10 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
String payload3 = "This is message 3";
String payload4 = "This is message 4";
- connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
- connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE,
false);
+ connection1.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload4.getBytes(),
QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
message1.ack();
@@ -355,12 +1186,17 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
assertEquals(payload3, new String(message3.getPayload()));
assertEquals(payload4, new String(message4.getPayload()));
+ connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
- connection2.unsubscribe(new String[]{TOPIC});
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
- connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
- connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
- connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
+ waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+ waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
+ connection1.publish(ANYCAST_TOPIC, payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
+ connection1.publish(ANYCAST_TOPIC, payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
Message message11 = connection1.receive(5, TimeUnit.SECONDS);
message11.ack();
@@ -377,16 +1213,16 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
assertTrue(payload3.equals(message11String) ||
payload3.equals(message21String) || payload3.equals(message31String));
} finally {
- String[] topics = new String[]{TOPIC};
- if (connection1 != null) {
+ String[] topics = new String[]{ANYCAST_TOPIC};
+ if (connection1 != null && connection1.isConnected()) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
- if (connection2 != null) {
+ if (connection2 != null && connection2.isConnected()) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
- if (connection3 != null) {
+ if (connection3 != null && connection3.isConnected()) {
connection3.unsubscribe(topics);
connection3.disconnect();
}
@@ -395,9 +1231,12 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
}
- private static BlockingConnection retrieveMQTTConnection(String host)
throws Exception {
+ private static BlockingConnection retrieveMQTTConnection(String host,
String clientId) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
+ mqtt.setClientId(clientId);
+ mqtt.setConnectAttemptsMax(0);
+ mqtt.setReconnectAttemptsMax(0);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;
@@ -450,4 +1289,7 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
return wildcardConfiguration;
}
+ private boolean waitConnectionClosed(BlockingConnection connection) throws
Exception {
+ return Wait.waitFor(() -> !connection.isConnected());
+ }
}