This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 677d71b8e7 ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and
subscriptions
677d71b8e7 is described below
commit 677d71b8e7b3bb0afd2fd618af4b86657324e50b
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Jul 14 16:49:31 2023 -0400
ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and subscriptions
---
.../artemis/cli/commands/AbstractAction.java | 17 +-
.../utils/collections/NodeStoreFactory.java | 21 ++
.../api/core/management/ManagementHelper.java | 37 ++++
.../amqp/broker/ProtonProtocolManager.java | 8 +-
.../connect/mirror/AMQPMirrorControllerSource.java | 6 +-
.../connect/mirror/AMQPMirrorControllerTarget.java | 12 +-
.../amqp/connect/mirror/ReferenceNodeStore.java | 41 +---
.../connect/mirror/ReferenceNodeStoreFactory.java | 82 ++++++++
.../apache/activemq/artemis/core/server/Queue.java | 3 +-
.../artemis/core/server/impl/QueueImpl.java | 9 +-
.../core/server/impl/RoutingContextTest.java | 4 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 4 +-
.../integration/amqp/connect/AMQPReplicaTest.java | 221 ++++++++++++++++++++
tests/smoke-tests/pom.xml | 32 +++
.../mirrored-subscriptions/broker1/broker.xml | 226 +++++++++++++++++++++
.../mirrored-subscriptions/broker2/broker.xml | 220 ++++++++++++++++++++
.../brokerConnection/MirroredSubscriptionTest.java | 164 +++++++++++++++
.../tests/smoke/common/SimpleManagement.java | 68 +++++++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 4 +-
19 files changed, 1107 insertions(+), 72 deletions(-)
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
index 37f08c35f1..3eebaf4e2c 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
@@ -28,25 +27,13 @@ import
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public abstract class AbstractAction extends ConnectionAbstract {
+ // TODO: This call could be replaced by a direct call into
ManagementHelpr.doManagement and their lambdas
public void performCoreManagement(ManagementCallback<ClientMessage> cb)
throws Exception {
-
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
ServerLocator locator = factory.getServerLocator();
ClientSessionFactory sessionFactory =
locator.createSessionFactory();
ClientSession session = sessionFactory.createSession(user,
password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
- session.start();
- ClientRequestor requestor = new ClientRequestor(session,
"activemq.management");
- ClientMessage message = session.createMessage(false);
-
- cb.setUpInvocation(message);
-
- ClientMessage reply = requestor.request(message);
-
- if (ManagementHelper.hasOperationSucceeded(reply)) {
- cb.requestSuccessful(reply);
- } else {
- cb.requestFailed(reply);
- }
+ ManagementHelper.doManagement(session, cb::setUpInvocation,
cb::requestSuccessful, cb::requestFailed);
}
}
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java
new file mode 100644
index 0000000000..2bd6c9c292
--- /dev/null
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+public interface NodeStoreFactory<E> {
+ NodeStore<E> newNodeStore();
+}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 678df28625..69a16a8f4b 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.api.core.management;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientRequestor;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -87,6 +94,36 @@ public final class ManagementHelper {
public static final SimpleString HDR_CLIENT_ID = new
SimpleString("_AMQ_Client_ID");
+ // Lambda declaration for management function. Pretty much same thing as
java.util.function.Consumer but with an exception in the declaration that was
needed.
+ public interface MessageAcceptor {
+ void accept(ClientMessage message) throws Exception;
+ }
+
+ /** Utility function to connect to a server and perform a management
operation via core. */
+ public static void doManagement(String uri, String user, String password,
MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws
Exception {
+ try (ServerLocator locator = ServerLocatorImpl.newLocator(uri);
+ ClientSessionFactory sessionFactory =
locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(user,
password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
+ doManagement(session, setup, ok, failed);
+ }
+ }
+
+ /** Utility function to reuse a ClientSessionConnection and perform a
single management operation via core. */
+ public static void doManagement(ClientSession session, MessageAcceptor
setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception {
+ session.start();
+ ClientRequestor requestor = new ClientRequestor(session,
"activemq.management");
+ ClientMessage message = session.createMessage(false);
+
+ setup.accept(message);
+
+ ClientMessage reply = requestor.request(message);
+
+ if (ManagementHelper.hasOperationSucceeded(reply)) {
+ ok.accept(reply);
+ } else {
+ failed.accept(reply);
+ }
+ }
/**
* Stores a resource attribute in a message to retrieve the value from the
server resource.
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index a34184ae6b..9355bbdff1 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import
org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStore;
+import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
@@ -75,7 +75,7 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
// We must use one referenceIDSupplier per server.
// protocol manager is the perfect aggregation for that.
- private ReferenceNodeStore referenceIDSupplier;
+ private ReferenceNodeStoreFactory referenceIDSupplier;
private final ProtonProtocolManagerFactory factory;
@@ -125,11 +125,11 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
routingHandler = new AMQPRoutingHandler(server);
}
- public synchronized ReferenceNodeStore getReferenceIDSupplier() {
+ public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() {
if (referenceIDSupplier == null) {
// we lazy start the instance.
// only create it when needed
- referenceIDSupplier = new ReferenceNodeStore(server);
+ referenceIDSupplier = new ReferenceNodeStoreFactory(server);
}
return referenceIDSupplier;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 160e44c1aa..6d943bf2d6 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -92,7 +92,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
final Queue snfQueue;
final ActiveMQServer server;
- final ReferenceNodeStore idSupplier;
+ final ReferenceNodeStoreFactory idSupplier;
final boolean acks;
final boolean addQueues;
final boolean deleteQueues;
@@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
}
- public static void validateProtocolData(ReferenceNodeStore
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
+ public static void validateProtocolData(ReferenceNodeStoreFactory
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
if (ref.getProtocolData(DeliveryAnnotations.class) == null &&
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
setProtocolData(referenceIDSupplier, ref);
}
}
/** This method will return the brokerID used by the message */
- private static String setProtocolData(ReferenceNodeStore
referenceIDSupplier, MessageReference ref) {
+ private static String setProtocolData(ReferenceNodeStoreFactory
referenceIDSupplier, MessageReference ref) {
String brokerID = referenceIDSupplier.getServerID(ref);
long id = referenceIDSupplier.getID(ref);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 55f35bf645..fa168005a0 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
- private final ReferenceNodeStore referenceNodeStore;
+ private final ReferenceNodeStoreFactory referenceNodeStore;
OperationContext mirrorContext;
@@ -367,15 +367,17 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
private void performAck(String nodeID, long messageID, Queue targetQueue,
ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
- MessageReference reference = targetQueue.removeWithSuppliedID(nodeID,
messageID, referenceNodeStore);
if (logger.isTraceEnabled()) {
- logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}).
Ref={}", nodeID, messageID, targetQueue.getName(), reference);
+ logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})",
nodeID, messageID, targetQueue.getName());
}
+ MessageReference reference = targetQueue.removeWithSuppliedID(nodeID,
messageID, referenceNodeStore);
+
+
if (reference == null) {
if (logger.isDebugEnabled()) {
- logger.debug("Retrying Reference not found on messageID={},
nodeID={}, currentRetry={}", messageID, nodeID, retry);
+ logger.debug("Retrying Reference not found on messageID={},
nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry);
}
switch (retry) {
case 0:
@@ -404,7 +406,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
if (reference != null) {
if (logger.isTraceEnabled()) {
- logger.trace("Post ack Server {} worked well for messageID={}
nodeID={}", server, messageID, nodeID);
+ logger.trace("Post ack Server {} worked well for messageID={}
nodeID={} queue={}, targetQueue={}", server, messageID, nodeID,
reference.getQueue(), targetQueue);
}
try {
switch (reason) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
index d82560cace..a9ae71a273 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
@@ -20,20 +20,16 @@ import java.util.HashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
-
public class ReferenceNodeStore implements NodeStore<MessageReference> {
- private final String serverID;
+ private final ReferenceNodeStoreFactory factory;
- public ReferenceNodeStore(ActiveMQServer server) {
- this.serverID = server.getNodeID().toString();
+ public ReferenceNodeStore(ReferenceNodeStoreFactory factory) {
+ this.factory = factory;
}
// This is where the messages are stored by server id...
@@ -43,10 +39,6 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
- public String getDefaultNodeID() {
- return serverID;
- }
-
@Override
public void storeNode(MessageReference element,
LinkedListImpl.Node<MessageReference> node) {
String list = getServerID(element);
@@ -90,7 +82,7 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
/** notice getMap should always return an instance. It should never return
null. */
private synchronized
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String
serverID) {
if (serverID == null) {
- serverID = this.serverID; // returning for the localList in case it's
null
+ serverID = factory.getDefaultNodeID();
}
if (lruListID != null && lruListID.equals(serverID)) {
@@ -113,34 +105,15 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
}
public String getServerID(MessageReference element) {
- return getServerID(element.getMessage());
+ return factory.getServerID(element);
}
-
public String getServerID(Message message) {
- Object nodeID =
message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
- if (nodeID != null) {
- return nodeID.toString();
- } else {
- // it is important to return null here, as the MirrorSource is
expecting it to be null
- // in the case the nodeID being from the originating server.
- // don't be tempted to return this.serverID here.
- return null;
- }
+ return factory.getServerID(message);
}
public long getID(MessageReference element) {
- Message message = element.getMessage();
- Long id = getID(message);
- if (id == null) {
- return element.getMessageID();
- } else {
- return id;
- }
- }
-
- private Long getID(Message message) {
- return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+ return factory.getID(element);
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java
new file mode 100644
index 0000000000..2782f85175
--- /dev/null
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
+
+public class ReferenceNodeStoreFactory implements
NodeStoreFactory<MessageReference> {
+
+ final ActiveMQServer server;
+
+ private final String serverID;
+
+ public ReferenceNodeStoreFactory(ActiveMQServer server) {
+ this.server = server;
+ this.serverID = server.getNodeID().toString();
+
+ }
+
+ @Override
+ public NodeStore<MessageReference> newNodeStore() {
+ return new ReferenceNodeStore(this);
+ }
+
+ public String getDefaultNodeID() {
+ return serverID;
+ }
+
+ public String getServerID(MessageReference element) {
+ return getServerID(element.getMessage());
+ }
+
+
+ public String getServerID(Message message) {
+ Object nodeID =
message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
+ if (nodeID != null) {
+ return nodeID.toString();
+ } else {
+ // it is important to return null here, as the MirrorSource is
expecting it to be null
+ // in the case the nodeID being from the originating server.
+ // don't be tempted to return this.serverID here.
+ return null;
+ }
+ }
+
+ public long getID(MessageReference element) {
+ Message message = element.getMessage();
+ Long id = getID(message);
+ if (id == null) {
+ return element.getMessageID();
+ } else {
+ return id;
+ }
+ }
+
+ private Long getID(Message message) {
+ return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+ }
+
+
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2c1319bcd4..e9042a9913 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -37,6 +37,7 @@ import
org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
public interface Queue extends Bindable,CriticalComponent {
@@ -77,7 +78,7 @@ public interface Queue extends Bindable,CriticalComponent {
* If the idSupplier returns {@literal < 0} the ID is considered a non
value (null) and it will be ignored.
*
* @see
org.apache.activemq.artemis.utils.collections.LinkedList#setNodeStore(NodeStore)
*/
- MessageReference removeWithSuppliedID(String serverID, long id,
NodeStore<MessageReference> nodeStore);
+ MessageReference removeWithSuppliedID(String serverID, long id,
NodeStoreFactory<MessageReference> nodeStore);
/**
* The queue definition could be durable, but the messages could eventually
be considered non durable.
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2ff193595b..9f948355d9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -116,6 +116,7 @@ import
org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -219,9 +220,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private NodeStore<MessageReference> nodeStore;
- private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {
- if (this.nodeStore != nodeStore) {
- this.nodeStore = nodeStore;
+ private void checkIDSupplier(NodeStoreFactory<MessageReference>
nodeStoreFactory) {
+ if (this.nodeStore == null) {
+ this.nodeStore = nodeStoreFactory.newNodeStore();
messageReferences.setNodeStore(nodeStore);
}
}
@@ -3457,7 +3458,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
- public synchronized MessageReference removeWithSuppliedID(String serverID,
long id, NodeStore<MessageReference> nodeStore) {
+ public synchronized MessageReference removeWithSuppliedID(String serverID,
long id, NodeStoreFactory<MessageReference> nodeStore) {
checkIDSupplier(nodeStore);
MessageReference reference = messageReferences.removeWithID(serverID,
id);
if (reference != null) {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
index 90cd723c48..09c96e8745 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -41,7 +41,7 @@ import
org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
import org.junit.Assert;
@@ -157,7 +157,7 @@ public class RoutingContextTest {
}
@Override
- public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStore<MessageReference> nodeStore) {
+ public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStoreFactory<MessageReference> nodeStore) {
return null;
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5e43510f8c..e4cd2acbe6 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -56,8 +56,8 @@ import
org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.UUID;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.slf4j.Logger;
@@ -868,7 +868,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStore<MessageReference> nodeStore) {
+ public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStoreFactory<MessageReference> nodeStore) {
return null;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 841c8ec1a6..66fafbc3ca 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -24,10 +24,16 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -59,9 +65,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQPReplicaTest extends AmqpClientTestSupport {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
protected static final int AMQP_PORT_2 = 5673;
protected static final int AMQP_PORT_3 = 5674;
public static final int TIME_BEFORE_RESTART = 1000;
@@ -834,6 +844,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
public Queue locateQueue(ActiveMQServer server, String queueName) throws
Exception {
+ Assert.assertNotNull(queueName);
+ Assert.assertNotNull(server);
Wait.waitFor(() -> server.locateQueue(queueName) != null);
return server.locateQueue(queueName);
}
@@ -1077,4 +1089,213 @@ public class AMQPReplicaTest extends
AmqpClientTestSupport {
conn.close();
}
+ private void consumeSubscription(int START_ID,
+ int LAST_ID,
+ int port,
+ String clientID,
+ String queueName,
+ String subscriptionName,
+ boolean assertNull) throws JMSException {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + port);
+ Connection conn = cf.createConnection();
+ conn.setClientID(clientID);
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+
+ HashSet<Integer> idsReceived = new HashSet<>();
+
+ Topic topic = sess.createTopic(queueName);
+
+ MessageConsumer consumer = sess.createDurableConsumer(topic,
subscriptionName);
+ for (int i = START_ID; i <= LAST_ID; i++) {
+ Message message = consumer.receive(3000);
+ Assert.assertNotNull(message);
+ Integer id = message.getIntProperty("i");
+ Assert.assertNotNull(id);
+ Assert.assertTrue(idsReceived.add(id));
+ }
+
+ if (assertNull) {
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+
+ for (int i = START_ID; i <= LAST_ID; i++) {
+ Assert.assertTrue(idsReceived.remove(i));
+ }
+
+ Assert.assertTrue(idsReceived.isEmpty());
+ conn.close();
+ }
+
+ @Test
+ public void testMulticast() throws Exception {
+ multiCastReplicaTest(false, false, false, false, true);
+ }
+
+
+ @Test
+ public void testMulticastSerializeConsumption() throws Exception {
+ multiCastReplicaTest(false, false, false, false, false);
+ }
+
+ @Test
+ public void testMulticastTargetPaging() throws Exception {
+ multiCastReplicaTest(false, true, false, false, true);
+ }
+
+ @Test
+ public void testMulticastTargetSourcePaging() throws Exception {
+ multiCastReplicaTest(false, true, true, true, true);
+ }
+
+ @Test
+ public void testMulticastTargetLargeMessage() throws Exception {
+ multiCastReplicaTest(true, true, true, true, true);
+ }
+
+
+ private void multiCastReplicaTest(boolean largeMessage,
+ boolean pagingTarget,
+ boolean pagingSource,
+ boolean restartBrokerConnection, boolean
multiThreadConsumers) throws Exception {
+
+ String brokerConnectionName = "brokerConnectionName:" +
UUIDGenerator.getInstance().generateStringUUID();
+ final ActiveMQServer server = this.server;
+ server.setIdentity("targetServer");
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("server_2");
+ server_2.getConfiguration().setName("thisone");
+
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPMirrorBrokerConnectionElement replica = new
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
+ replica.setName("theReplica");
+ amqpConnection.addElement(replica);
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+ server_2.getConfiguration().setName("server_2");
+
+ int NUMBER_OF_MESSAGES = 200;
+
+ server_2.start();
+ server.start();
+ Wait.assertTrue(server_2::isStarted);
+ Wait.assertTrue(server::isStarted);
+
+ // We create the address to avoid auto delete on the queue
+ server_2.addAddressInfo(new
AddressInfo(getTopicName()).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false));
+
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT_2);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getTopicName());
+ MessageProducer producer = session.createProducer(topic);
+
+ for (int i = 0; i <= 1; i++) {
+ // just creating the subscription and not consuming anything
+ consumeSubscription(0, -1, AMQP_PORT_2, "client" + i, getTopicName(),
"subscription" + i, false);
+ }
+
+ String subs0Name = "client0.subscription0";
+ String subs1Name = "client1.subscription1";
+
+
+ Queue subs0Server1 = locateQueue(server, subs0Name);
+ Queue subs1Server1 = locateQueue(server, subs1Name);
+ Assert.assertNotNull(subs0Server1);
+ Assert.assertNotNull(subs1Server1);
+
+ Queue subs0Server2 = locateQueue(server_2, subs0Name);
+ Queue subs1Server2 = locateQueue(server_2, subs1Name);
+ Assert.assertNotNull(subs0Server2);
+ Assert.assertNotNull(subs1Server2);
+
+ if (pagingTarget) {
+ subs0Server1.getPagingStore().startPaging();
+ }
+
+ if (pagingSource) {
+ subs0Server2.getPagingStore().startPaging();
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ Message message = session.createTextMessage(getText(largeMessage, i));
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+
+ if (pagingTarget) {
+ subs0Server1.getPagingStore().startPaging();
+ }
+
+ Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
+
+ Assert.assertNotNull(snfreplica);
+
+ Wait.assertEquals(0, snfreplica::getMessageCount);
+
+ Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server1::getMessageCount,
2000);
+ Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server1::getMessageCount,
2000);
+
+ Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server2::getMessageCount,
2000);
+ Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server2::getMessageCount,
2000);
+
+ if (restartBrokerConnection) {
+ // stop and start the broker connection, making sure we wouldn't
duplicate the mirror
+ server_2.stopBrokerConnection(brokerConnectionName);
+ Thread.sleep(1000);
+ server_2.startBrokerConnection(brokerConnectionName);
+ }
+
+ Assert.assertSame(snfreplica,
server_2.locateQueue(replica.getMirrorSNF()));
+
+ if (pagingTarget) {
+ assertTrue(subs0Server1.getPagingStore().isPaging());
+ assertTrue(subs1Server1.getPagingStore().isPaging());
+ }
+
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(2);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ for (int i = 0; i <= 1; i++) {
+ CountDownLatch threadDone = new CountDownLatch(1);
+ int subscriptionID = i;
+ executorService.execute(() -> {
+ try {
+ consumeSubscription(0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2,
"client" + subscriptionID, getTopicName(), "subscription" + subscriptionID,
false);
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ threadDone.countDown();
+ }
+ });
+ if (!multiThreadConsumers) {
+ threadDone.await(1, TimeUnit.MINUTES);
+ }
+ }
+
+ Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ // Replica is async, so we need to wait acks to arrive before we finish
consuming there
+ Wait.assertEquals(0, snfreplica::getMessageCount);
+ Wait.assertEquals(0L, subs0Server1::getMessageCount, 2000, 100);
+ Wait.assertEquals(0L, subs1Server1::getMessageCount, 2000, 100);
+ Wait.assertEquals(0L, subs0Server2::getMessageCount, 2000, 100);
+ Wait.assertEquals(0L, subs1Server2::getMessageCount, 2000, 100);
+
+
+ if (largeMessage) {
+
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(),
0);
+
validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(),
0);
+ }
+ }
+
+
+
}
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 79990fa897..680fc2497a 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -1366,6 +1366,38 @@
</args>
</configuration>
</execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-test-Mirror1</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>admin</user>
+ <password>admin</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>true</noWeb>
+
<instance>${basedir}/target/mirrored-subscriptions/broker1</instance>
+
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker1</configuration>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-test-Mirror2</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>admin</user>
+ <password>admin</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>true</noWeb>
+
<instance>${basedir}/target/mirrored-subscriptions/broker2</instance>
+
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker2</configuration>
+ </configuration>
+ </execution>
</executions>
<dependencies>
<dependency>
diff --git
a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml
new file mode 100644
index 0000000000..763558fec9
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml
@@ -0,0 +1,226 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in
memory
+
+ The system will use half of the available memory (-Xmx) by default
for the global-max-size.
+ You may specify a different value here if you need to customize it
to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61617" name="mirror"
retry-interval="100">
+ <mirror/>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+
+ <address-setting match="myTopic">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+
+ <address-setting match="myTopicPaging">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>100K</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <!-- this should be maxed from the default -->
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="myTopic">
+ <multicast/>
+ </address>
+ <address name="myTopicPaging">
+ <multicast/>
+ </address>
+
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml
new file mode 100644
index 0000000000..33c8a7ca3a
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml
@@ -0,0 +1,220 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in
memory
+
+ The system will use half of the available memory (-Xmx) by default
for the global-max-size.
+ You may specify a different value here if you need to customize it
to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor name="artemis">
+
tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300
+ </acceptor>
+ </acceptors>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+
+ <address-setting match="myTopic">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+
+ <address-setting match="myTopicPaging">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>100K</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <!-- this should be maxed from the default -->
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="myTopic">
+ <multicast/>
+ </address>
+ <address name="myTopicPaging">
+ <multicast/>
+ </address>
+
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java
new file mode 100644
index 0000000000..69242281a9
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.smoke.common.SimpleManagement;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MirroredSubscriptionTest extends SmokeTestBase {
+
+ public static final String SERVER_NAME_A = "mirrored-subscriptions/broker1";
+ public static final String SERVER_NAME_B = "mirrored-subscriptions/broker2";
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ // Change this to true to generate a print-data in certain cases on this
test
+ private static final boolean PRINT_DATA = false;
+ private static final String JMX_SERVER_HOSTNAME = "localhost";
+ private static final int JMX_SERVER_PORT = 11099;
+
+ Process processB;
+ Process processA;
+
+ @Before
+ public void beforeClass() throws Exception {
+ cleanupData(SERVER_NAME_A);
+ cleanupData(SERVER_NAME_B);
+ processB = startServer(SERVER_NAME_B, 1, 0);
+ processA = startServer(SERVER_NAME_A, 0, 0);
+
+ ServerUtil.waitForServerToStart(1, "B", "B", 30000);
+ ServerUtil.waitForServerToStart(0, "A", "A", 30000);
+ }
+
+ @Test
+ public void testSend() throws Throwable {
+
+ int COMMIT_INTERVAL = 100;
+ int NUMBER_OF_MESSAGES = 500;
+ int CLIENTS = 2;
+ String mainURI = "tcp://localhost:61616";
+ String secondURI = "tcp://localhost:61617";
+
+ String topicName = "myTopic";
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("amqp", mainURI);
+
+ for (int i = 0; i < CLIENTS; i++) {
+ try (Connection connection = cf.createConnection()) {
+ connection.setClientID("client" + i);
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(topicName);
+ session.createDurableSubscriber(topic, "subscription" + i);
+ }
+ }
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(topicName);
+ MessageProducer producer = session.createProducer(topic);
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ if (i % COMMIT_INTERVAL == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ }
+
+ Map<String, Integer> result = SimpleManagement.listQueues(mainURI, null,
null, 100);
+ result.entrySet().forEach(entry -> System.out.println("Queue " +
entry.getKey() + "=" + entry.getValue()));
+
+ checkMessages(NUMBER_OF_MESSAGES, CLIENTS, mainURI, secondURI);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(CLIENTS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(CLIENTS);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ for (int i = 0; i < CLIENTS; i++) {
+ final int clientID = i;
+ executorService.execute(() -> {
+ try (Connection connection = cf.createConnection()) {
+ connection.setClientID("client" + clientID);
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(topicName);
+ TopicSubscriber subscriber =
session.createDurableSubscriber(topic, "subscription" + clientID);
+ for (int messageI = 0; messageI < NUMBER_OF_MESSAGES;
messageI++) {
+ TextMessage message = (TextMessage) subscriber.receive(5000);
+ Assert.assertNotNull(message);
+ if (messageI % COMMIT_INTERVAL == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(done.await(300, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+ checkMessages(0, CLIENTS, mainURI, secondURI);
+ }
+
+ private void checkMessages(int NUMBER_OF_MESSAGES, int CLIENTS, String
mainURI, String secondURI) throws Exception {
+ for (int i = 0; i < CLIENTS; i++) {
+ final int clientID = i;
+ Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI,
"client" + clientID + ".subscription" + clientID));
+ Wait.assertEquals(NUMBER_OF_MESSAGES, () ->
getMessageCount(secondURI, "client" + clientID + ".subscription" + clientID));
+ }
+ }
+
+ int getMessageCount(String uri, String queueName) throws Exception {
+ Map<String, Integer> result = SimpleManagement.listQueues(uri, null,
null, 100);
+ Integer resultReturn = result.get(queueName);
+
+ logger.debug("Result = {}, queueName={}, returnValue = {}", result,
queueName, resultReturn);
+ return resultReturn == null ? 0 : resultReturn;
+
+ }
+
+}
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java
new file mode 100644
index 0000000000..5b9a34b2df
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.common;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.json.JsonObject;
+
+public class SimpleManagement {
+
+ private static final String SIMPLE_OPTIONS =
"{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}";
+
+ /** Simple management function that will return a list of Pair<Name of
Queue, Number of Messages> */
+ public static Map<String, Integer> listQueues(String uri, String user,
String password, int maxRows) throws Exception {
+ Map<String, Integer> queues = new HashMap<>();
+ ManagementHelper.doManagement(uri, user, password, t ->
setupListQueue(t, maxRows), t -> listQueueResult(t, queues),
SimpleManagement::failed);
+ return queues;
+ }
+
+ private static void setupListQueue(ClientMessage m, int maxRows) throws
Exception {
+ ManagementHelper.putOperationInvocation(m, "broker", "listQueues",
SIMPLE_OPTIONS, 1, maxRows);
+ }
+
+ private static void listQueueResult(ClientMessage message, Map<String,
Integer> mapQueues) throws Exception {
+
+ final String result = (String) ManagementHelper.getResult(message,
String.class);
+
+
+ JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(result);
+ JsonArray array = queuesAsJsonObject.getJsonArray("data");
+
+ for (int i = 0; i < array.size(); i++) {
+ JsonObject object = array.getJsonObject(i);
+ String name = object.getString("name");
+ String messageCount = object.getString("messageCount");
+ mapQueues.put(name, Integer.parseInt(messageCount));
+ }
+
+ }
+
+ private static void failed(ClientMessage message) throws Exception {
+
+ final String result = (String) ManagementHelper.getResult(message,
String.class);
+
+ throw new Exception("Failed " + result);
+ }
+
+}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bb779ac2a0..cd30f4b054 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -40,8 +40,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@@ -143,7 +143,7 @@ public class FakeQueue extends CriticalComponentImpl
implements Queue {
}
@Override
- public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStore<MessageReference> nodeStore) {
+ public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStoreFactory<MessageReference> nodeStore) {
return null;
}