This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 99302b1935 ARTEMIS-3753 Prevent sending message to internal queues on
mirror
new a38fae1fbd This closes #4012
99302b1935 is described below
commit 99302b193527c678b525a198f0ffa41615eca498
Author: iliya <[email protected]>
AuthorDate: Tue Apr 5 00:21:37 2022 +0300
ARTEMIS-3753 Prevent sending message to internal queues on mirror
In cluster configuration messages could be routed to internal queues for
further delivering on different broker. We need to check that before
sending to SNF, otherwise message can stuck on target server and will
never receive ACK.
co-author: Clebert Suconic
Discusssions on https://github.com/apache/activemq-artemis/pull/4012 and
https://github.com/apache/activemq-artemis/pull/4038
---
.../connect/mirror/AMQPMirrorControllerSource.java | 7 +
.../connect/mirror/AMQPMirrorControllerTarget.java | 3 +-
.../artemis/core/postoffice/impl/BindingsImpl.java | 17 +-
.../artemis/core/server/RoutingContext.java | 8 +
.../core/server/impl/RoutingContextImpl.java | 30 +
.../core/server/impl/RoutingContextTest.java | 945 +++++++++++++++++++++
.../amqp/connect/AMQPClusterReplicaTest.java | 172 ++++
7 files changed, 1176 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 6814aaeeb9..2d7cd10008 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -201,6 +201,13 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
return;
}
+ if (context.isInternal()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("server " + server + " is discarding send to avoid
sending to internal queue");
+ }
+ return;
+ }
+
if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index dd07dbb747..5c664bc7e3 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -459,7 +460,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
- routingContext.clear().setMirrorSource(this);
+
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate
detection cache that needs to be done atomically
transaction.commit();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 75fb7b810b..13c95ddcc0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -325,6 +325,14 @@ public final class BindingsImpl implements Bindings {
}
}
+ private MessageLoadBalancingType getMessageLoadBalancingType(RoutingContext
context) {
+ if (context.getLoadBalancingType() != null) {
+ return context.getLoadBalancingType();
+ } else {
+ return this.messageLoadBalancingType;
+ }
+ }
+
private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
@@ -333,7 +341,7 @@ public final class BindingsImpl implements Bindings {
}
routingNameBindingMap.forEachBindings((bindings, nextPosition) -> {
- final Binding nextBinding = getNextBinding(message, bindings,
nextPosition);
+ final Binding nextBinding = getNextBinding(message, bindings,
nextPosition, getMessageLoadBalancingType(context));
if (nextBinding != null && nextBinding.getFilter() == null &&
nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
@@ -362,7 +370,8 @@ public final class BindingsImpl implements Bindings {
*/
private Binding getNextBinding(final Message message,
final Binding[] bindings,
- final CopyOnWriteBindings.BindingIndex
bindingIndex) {
+ final CopyOnWriteBindings.BindingIndex
bindingIndex,
+ final MessageLoadBalancingType
loadBalancingType) {
int nextPosition = bindingIndex.getIndex();
final int bindingsCount = bindings.length;
@@ -373,8 +382,6 @@ public final class BindingsImpl implements Bindings {
Binding nextBinding = null;
int lastLowPriorityBinding = -1;
- // snapshot this, to save loading it on each iteration
- final MessageLoadBalancingType loadBalancingType =
this.messageLoadBalancingType;
for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
@@ -438,7 +445,7 @@ public final class BindingsImpl implements Bindings {
if (resp == null) {
// ok let's find the next binding to propose
- Binding theBinding = getNextBinding(message, bindings,
nextPosition);
+ Binding theBinding = getNextBinding(message, bindings,
nextPosition, getMessageLoadBalancingType(context));
if (theBinding == null) {
return;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 5593d0a5bc..fa781fdb86 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,6 +42,9 @@ public interface RoutingContext {
* to avoid*/
boolean isMirrorController();
+ /** return true if every queue routed is internal */
+ boolean isInternal();
+
MirrorController getMirrorSource();
RoutingContext setMirrorSource(MirrorController mirrorController);
@@ -95,5 +99,9 @@ public interface RoutingContext {
RoutingContext setDuplicateDetection(boolean value);
+ RoutingContext setLoadBalancingType(MessageLoadBalancingType
messageLoadBalancingType);
+
+ MessageLoadBalancingType getLoadBalancingType();
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 3dc6be463f..033c3a719c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@@ -52,6 +53,9 @@ public class RoutingContextImpl implements RoutingContext {
private RoutingType previousRoutingType;
+ // if we wanted to bypass the load balancing configured elsewhere
+ private MessageLoadBalancingType loadBalancingType;
+
/* To be set by the Mirror target on the server, to avoid ping pongs or
reflections of messages between mirrors */
private MirrorController mirrorControllerSource;
@@ -59,6 +63,8 @@ public class RoutingContextImpl implements RoutingContext {
Boolean reusable = null;
+ Boolean internalOnly = null;
+
volatile int version;
private final Executor executor;
@@ -95,6 +101,11 @@ public class RoutingContextImpl implements RoutingContext {
return reusable != null && reusable;
}
+ @Override
+ public boolean isInternal() {
+ return internalOnly != null && internalOnly;
+ }
+
@Override
public int getPreviousBindingsVersion() {
return version;
@@ -138,6 +149,8 @@ public class RoutingContextImpl implements RoutingContext {
this.reusable = null;
+ this.internalOnly = null;
+
return this;
}
@@ -163,6 +176,13 @@ public class RoutingContextImpl implements RoutingContext {
listing.getNonDurableQueues().add(queue);
}
+ if (internalOnly == null) {
+ internalOnly = true;
+ }
+
+ // every queue added has to be internal only
+ internalOnly = internalOnly && queue.isInternalQueue();
+
queueCount++;
}
@@ -198,6 +218,16 @@ public class RoutingContextImpl implements RoutingContext {
}
}
+ @Override
+ public RoutingContextImpl setLoadBalancingType(MessageLoadBalancingType
messageLoadBalancingType) {
+ this.loadBalancingType = messageLoadBalancingType;
+ return this;
+ }
+
+ @Override
+ public MessageLoadBalancingType getLoadBalancingType() {
+ return loadBalancingType;
+ }
@Override
public void addQueueWithAck(SimpleString address, Queue queue) {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
new file mode 100644
index 0000000000..49fcfcd1be
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -0,0 +1,945 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.utils.ReferenceCounter;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RoutingContextTest {
+
+ private static class FakeQueueForRoutingContextTest implements Queue {
+
+ final String name;
+ final boolean isInternal;
+ final boolean durable;
+
+ FakeQueueForRoutingContextTest(String name, boolean isInternal, boolean
durable) {
+ this.name = name;
+ this.isInternal = isInternal;
+ this.durable = durable;
+ }
+
+ @Override
+ public CriticalAnalyzer getCriticalAnalyzer() {
+ return null;
+ }
+
+ @Override
+ public CriticalCloseable measureCritical(int path) {
+ return null;
+ }
+
+ @Override
+ public boolean checkExpiration(long timeout, boolean reset) {
+ return false;
+ }
+
+ @Override
+ public void route(Message message, RoutingContext context) throws
Exception {
+
+ }
+
+ @Override
+ public void routeWithAck(Message message, RoutingContext context) throws
Exception {
+
+ }
+
+ @Override
+ public SimpleString getName() {
+ return SimpleString.toSimpleString(name);
+ }
+
+ @Override
+ public Long getID() {
+ return null;
+ }
+
+ @Override
+ public Filter getFilter() {
+ return null;
+ }
+
+ @Override
+ public void setFilter(Filter filter) {
+
+ }
+
+ @Override
+ public PagingStore getPagingStore() {
+ return null;
+ }
+
+ @Override
+ public PageSubscription getPageSubscription() {
+ return null;
+ }
+
+ @Override
+ public RoutingType getRoutingType() {
+ return null;
+ }
+
+ @Override
+ public void setRoutingType(RoutingType routingType) {
+
+ }
+
+ @Override
+ public boolean allowsReferenceCallback() {
+ return false;
+ }
+
+ @Override
+ public boolean isDurable() {
+ return durable;
+ }
+
+ @Override
+ public int durableUp(Message message) {
+ return 0;
+ }
+
+ @Override
+ public int durableDown(Message message) {
+ return 0;
+ }
+
+ @Override
+ public void refUp(MessageReference messageReference) {
+
+ }
+
+ @Override
+ public void refDown(MessageReference messageReference) {
+
+ }
+
+ @Override
+ public MessageReference removeWithSuppliedID(String serverID, long id,
NodeStore<MessageReference> nodeStore) {
+ return null;
+ }
+
+ @Override
+ public boolean isDurableMessage() {
+ return false;
+ }
+
+ @Override
+ public boolean isAutoDelete() {
+ return false;
+ }
+
+ @Override
+ public long getAutoDeleteDelay() {
+ return 0;
+ }
+
+ @Override
+ public long getAutoDeleteMessageCount() {
+ return 0;
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return false;
+ }
+
+ @Override
+ public boolean isAutoCreated() {
+ return false;
+ }
+
+ @Override
+ public boolean isPurgeOnNoConsumers() {
+ return false;
+ }
+
+ @Override
+ public void setPurgeOnNoConsumers(boolean value) {
+
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ @Override
+ public void setEnabled(boolean value) {
+
+ }
+
+ @Override
+ public int getConsumersBeforeDispatch() {
+ return 0;
+ }
+
+ @Override
+ public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+
+ }
+
+ @Override
+ public long getDelayBeforeDispatch() {
+ return 0;
+ }
+
+ @Override
+ public void setDelayBeforeDispatch(long delayBeforeDispatch) {
+
+ }
+
+ @Override
+ public long getDispatchStartTime() {
+ return 0;
+ }
+
+ @Override
+ public boolean isDispatching() {
+ return false;
+ }
+
+ @Override
+ public void setDispatching(boolean dispatching) {
+
+ }
+
+ @Override
+ public boolean isExclusive() {
+ return false;
+ }
+
+ @Override
+ public void setExclusive(boolean value) {
+
+ }
+
+ @Override
+ public boolean isLastValue() {
+ return false;
+ }
+
+ @Override
+ public SimpleString getLastValueKey() {
+ return null;
+ }
+
+ @Override
+ public boolean isNonDestructive() {
+ return false;
+ }
+
+ @Override
+ public void setNonDestructive(boolean nonDestructive) {
+
+ }
+
+ @Override
+ public int getMaxConsumers() {
+ return 0;
+ }
+
+ @Override
+ public void setMaxConsumer(int maxConsumers) {
+
+ }
+
+ @Override
+ public int getGroupBuckets() {
+ return 0;
+ }
+
+ @Override
+ public void setGroupBuckets(int groupBuckets) {
+
+ }
+
+ @Override
+ public boolean isGroupRebalance() {
+ return false;
+ }
+
+ @Override
+ public void setGroupRebalance(boolean groupRebalance) {
+
+ }
+
+ @Override
+ public boolean isGroupRebalancePauseDispatch() {
+ return false;
+ }
+
+ @Override
+ public void setGroupRebalancePauseDispatch(boolean
groupRebalancePauseDisptach) {
+
+ }
+
+ @Override
+ public SimpleString getGroupFirstKey() {
+ return null;
+ }
+
+ @Override
+ public void setGroupFirstKey(SimpleString groupFirstKey) {
+
+ }
+
+ @Override
+ public boolean isConfigurationManaged() {
+ return false;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+
+ }
+
+ @Override
+ public void addConsumer(Consumer consumer) throws Exception {
+
+ }
+
+ @Override
+ public void addLingerSession(String sessionId) {
+
+ }
+
+ @Override
+ public void removeLingerSession(String sessionId) {
+
+ }
+
+ @Override
+ public void removeConsumer(Consumer consumer) {
+
+ }
+
+ @Override
+ public int getConsumerCount() {
+ return 0;
+ }
+
+ @Override
+ public long getConsumerRemovedTimestamp() {
+ return 0;
+ }
+
+ @Override
+ public void setRingSize(long ringSize) {
+
+ }
+
+ @Override
+ public long getRingSize() {
+ return 0;
+ }
+
+ @Override
+ public ReferenceCounter getConsumersRefCount() {
+ return null;
+ }
+
+ @Override
+ public void addSorted(List<MessageReference> refs, boolean scheduling) {
+
+ }
+
+ @Override
+ public void reload(MessageReference ref) {
+
+ }
+
+ @Override
+ public void addTail(MessageReference ref) {
+
+ }
+
+ @Override
+ public void addTail(MessageReference ref, boolean direct) {
+
+ }
+
+ @Override
+ public void addHead(MessageReference ref, boolean scheduling) {
+
+ }
+
+ @Override
+ public void addSorted(MessageReference ref, boolean scheduling) {
+
+ }
+
+ @Override
+ public void addHead(List<MessageReference> refs, boolean scheduling) {
+
+ }
+
+ @Override
+ public void acknowledge(MessageReference ref) throws Exception {
+
+ }
+
+ @Override
+ public void acknowledge(MessageReference ref, ServerConsumer consumer)
throws Exception {
+
+ }
+
+ @Override
+ public void acknowledge(MessageReference ref, AckReason reason,
ServerConsumer consumer) throws Exception {
+
+ }
+
+ @Override
+ public void acknowledge(Transaction tx, MessageReference ref) throws
Exception {
+
+ }
+
+ @Override
+ public void acknowledge(Transaction tx,
+ MessageReference ref,
+ AckReason reason,
+ ServerConsumer consumer) throws Exception {
+
+ }
+
+ @Override
+ public void reacknowledge(Transaction tx, MessageReference ref) throws
Exception {
+
+ }
+
+ @Override
+ public void cancel(Transaction tx, MessageReference ref) {
+
+ }
+
+ @Override
+ public void cancel(Transaction tx, MessageReference ref, boolean
ignoreRedeliveryCheck) {
+
+ }
+
+ @Override
+ public void cancel(MessageReference reference, long timeBase) throws
Exception {
+
+ }
+
+ @Override
+ public void deliverAsync() {
+
+ }
+
+ @Override
+ public void unproposed(SimpleString groupID) {
+
+ }
+
+ @Override
+ public void forceDelivery() {
+
+ }
+
+ @Override
+ public void deleteQueue() throws Exception {
+
+ }
+
+ @Override
+ public void deleteQueue(boolean removeConsumers) throws Exception {
+
+ }
+
+ @Override
+ public void removeAddress() throws Exception {
+
+ }
+
+ @Override
+ public void destroyPaging() throws Exception {
+
+ }
+
+ @Override
+ public long getMessageCount() {
+ return 0;
+ }
+
+ @Override
+ public long getPersistentSize() {
+ return 0;
+ }
+
+ @Override
+ public long getDurableMessageCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDurablePersistentSize() {
+ return 0;
+ }
+
+ @Override
+ public int getDeliveringCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDeliveringSize() {
+ return 0;
+ }
+
+ @Override
+ public int getDurableDeliveringCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDurableDeliveringSize() {
+ return 0;
+ }
+
+ @Override
+ public void referenceHandled(MessageReference ref) {
+
+ }
+
+ @Override
+ public int getScheduledCount() {
+ return 0;
+ }
+
+ @Override
+ public long getScheduledSize() {
+ return 0;
+ }
+
+ @Override
+ public int getDurableScheduledCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDurableScheduledSize() {
+ return 0;
+ }
+
+ @Override
+ public List<MessageReference> getScheduledMessages() {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<MessageReference>> getDeliveringMessages() {
+ return null;
+ }
+
+ @Override
+ public long getMessagesAdded() {
+ return 0;
+ }
+
+ @Override
+ public long getAcknowledgeAttempts() {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesExpired() {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesKilled() {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesReplaced() {
+ return 0;
+ }
+
+ @Override
+ public MessageReference removeReferenceWithID(long id) throws Exception {
+ return null;
+ }
+
+ @Override
+ public MessageReference getReference(long id) throws ActiveMQException {
+ return null;
+ }
+
+ @Override
+ public int deleteAllReferences() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int deleteAllReferences(int flushLimit) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public boolean deleteReference(long messageID) throws Exception {
+ return false;
+ }
+
+ @Override
+ public int deleteMatchingReferences(Filter filter) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int deleteMatchingReferences(int flushLImit, Filter filter,
AckReason ackReason) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public boolean expireReference(long messageID) throws Exception {
+ return false;
+ }
+
+ @Override
+ public int expireReferences(Filter filter) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void expireReferences(Runnable done) {
+
+ }
+
+ @Override
+ public void expire(MessageReference ref) throws Exception {
+
+ }
+
+ @Override
+ public void expire(MessageReference ref, ServerConsumer consumer) throws
Exception {
+
+ }
+
+ @Override
+ public boolean sendMessageToDeadLetterAddress(long messageID) throws
Exception {
+ return false;
+ }
+
+ @Override
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws
Exception {
+ return 0;
+ }
+
+ @Override
+ public boolean sendToDeadLetterAddress(Transaction tx, MessageReference
ref) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean changeReferencePriority(long messageID, byte newPriority)
throws Exception {
+ return false;
+ }
+
+ @Override
+ public int changeReferencesPriority(Filter filter, byte newPriority)
throws Exception {
+ return 0;
+ }
+
+ @Override
+ public boolean moveReference(long messageID,
+ SimpleString toAddress,
+ Binding binding,
+ boolean rejectDuplicates) throws Exception {
+ return false;
+ }
+
+ @Override
+ public int moveReferences(Filter filter, SimpleString toAddress, Binding
binding) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int moveReferences(int flushLimit,
+ Filter filter,
+ SimpleString toAddress,
+ boolean rejectDuplicates,
+ Binding binding) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int moveReferences(int flushLimit,
+ Filter filter,
+ SimpleString toAddress,
+ boolean rejectDuplicates,
+ int messageCount,
+ Binding binding) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int retryMessages(Filter filter) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void addRedistributor(long delay) {
+
+ }
+
+ @Override
+ public void cancelRedistributor() {
+
+ }
+
+ @Override
+ public boolean hasMatchingConsumer(Message message) {
+ return false;
+ }
+
+ @Override
+ public Collection<Consumer> getConsumers() {
+ return null;
+ }
+
+ @Override
+ public Map<SimpleString, Consumer> getGroups() {
+ return null;
+ }
+
+ @Override
+ public void resetGroup(SimpleString groupID) {
+
+ }
+
+ @Override
+ public void resetAllGroups() {
+
+ }
+
+ @Override
+ public int getGroupCount() {
+ return 0;
+ }
+
+ @Override
+ public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
+ long timeBase,
+ boolean
ignoreRedeliveryDelay) throws Exception {
+ return null;
+ }
+
+ @Override
+ public LinkedListIterator<MessageReference> iterator() {
+ return null;
+ }
+
+ @Override
+ public LinkedListIterator<MessageReference> browserIterator() {
+ return null;
+ }
+
+ @Override
+ public SimpleString getExpiryAddress() {
+ return null;
+ }
+
+ @Override
+ public SimpleString getDeadLetterAddress() {
+ return null;
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void pause(boolean persist) {
+
+ }
+
+ @Override
+ public void reloadPause(long recordID) {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public boolean isPaused() {
+ return false;
+ }
+
+ @Override
+ public boolean isPersistedPause() {
+ return false;
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void resetAllIterators() {
+
+ }
+
+ @Override
+ public boolean flushExecutor() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public boolean isDirectDeliver() {
+ return false;
+ }
+
+ @Override
+ public SimpleString getAddress() {
+ return SimpleString.toSimpleString(name);
+ }
+
+ @Override
+ public boolean isInternalQueue() {
+ return isInternal;
+ }
+
+ @Override
+ public void setInternalQueue(boolean internalQueue) {
+
+ }
+
+ @Override
+ public void resetMessagesAdded() {
+
+ }
+
+ @Override
+ public void resetMessagesAcknowledged() {
+
+ }
+
+ @Override
+ public void resetMessagesExpired() {
+
+ }
+
+ @Override
+ public void resetMessagesKilled() {
+
+ }
+
+ @Override
+ public void incrementMesssagesAdded() {
+
+ }
+
+ @Override
+ public void deliverScheduledMessages() throws ActiveMQException {
+
+ }
+
+ @Override
+ public void postAcknowledge(MessageReference ref, AckReason reason) {
+
+ }
+
+ @Override
+ public SimpleString getUser() {
+ return null;
+ }
+
+ @Override
+ public void setUser(SimpleString user) {
+
+ }
+
+ @Override
+ public void recheckRefCount(OperationContext context) {
+
+ }
+ }
+
+ @Test
+ public void testValidateInternal() {
+ RoutingContext context = new RoutingContextImpl(new TransactionImpl(new
NullStorageManager()));
+ Assert.assertFalse(context.isInternal());
+
+ context.addQueue(SimpleString.toSimpleString("t1"), new
FakeQueueForRoutingContextTest("t1", true, true));
+ Assert.assertTrue(context.isInternal());
+
+ context.addQueue(SimpleString.toSimpleString("t2"), new
FakeQueueForRoutingContextTest("t2", false, true));
+ Assert.assertFalse(context.isInternal());
+
+ context.addQueue(SimpleString.toSimpleString("t3"), new
FakeQueueForRoutingContextTest("t3", true, true));
+ Assert.assertFalse(context.isInternal());
+
+ context.clear();
+ Assert.assertFalse(context.isInternal());
+
+ context.addQueue(SimpleString.toSimpleString("t1"), new
FakeQueueForRoutingContextTest("t1", true, true));
+ Assert.assertTrue(context.isInternal());
+
+ context.addQueue(SimpleString.toSimpleString("t2"), new
FakeQueueForRoutingContextTest("t2", true, true));
+ Assert.assertTrue(context.isInternal());
+
+ context.addQueue(SimpleString.toSimpleString("t3"), new
FakeQueueForRoutingContextTest("t3", true, true));
+ Assert.assertTrue(context.isInternal());
+ }
+
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPClusterReplicaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPClusterReplicaTest.java
new file mode 100644
index 0000000000..69e026f1cd
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPClusterReplicaTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class AMQPClusterReplicaTest extends AmqpClientTestSupport {
+
+ protected static final int NODE_1_PORT = 5673;
+ protected static final int NODE_2_PORT = 5674;
+
+ @Test
+ public void testReplicaWithCluster() throws Exception {
+ ActiveMQServer node_1 = createNode1(MessageLoadBalancingType.ON_DEMAND);
+ ActiveMQServer node_2 = createNode2(MessageLoadBalancingType.ON_DEMAND);
+
+ server.start();
+
+ // Set node_1 mirror to target
+ node_1.getConfiguration().addAMQPConnection(new
AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true)));
+
+ node_1.start();
+ node_2.start();
+
+ configureAddressAndQueue(node_1);
+ configureAddressAndQueue(node_2);
+
+ waitForTopology(node_1, 2);
+ waitForTopology(node_2, 2);
+
+ {
+ // sender
+ ClientSessionFactory sessionFactory =
addSessionFactory(getNode1ServerLocator().createSessionFactory());
+ ClientSession session =
addClientSession(sessionFactory.createSession());
+ sendMessages(session,
addClientProducer(session.createProducer("test")), 10);
+ }
+
+ {
+ // receiver
+ ClientSessionFactory sessionFactory =
addSessionFactory(getNode2ServerLocator().createSessionFactory());
+ ClientSession session =
addClientSession(sessionFactory.createSession());
+ session.start();
+ receiveMessages(addClientConsumer(session.createConsumer("test")), 0,
10, true);
+ }
+
+ // Wait to mirror target to read all messages
+ Wait.waitFor(() ->
node_1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror").getMessageCount() == 0);
+
+ // Expect no messages in mirrored test queue
+ Wait.assertEquals(0, () -> server.locateQueue("test").getMessageCount());
+ }
+
+
+
+ @Test
+ public void testReplicaWithClusterTargetStrict() throws Exception {
+ ActiveMQServer node_1 = createNode1(MessageLoadBalancingType.STRICT);
+ ActiveMQServer node_2 = createNode2(MessageLoadBalancingType.STRICT);
+
+ server.stop();
+ // Set node_1 mirror to target
+ server.getConfiguration().addAMQPConnection(new
AMQPBrokerConnectConfiguration("mirror1", "tcp://localhost:" +
NODE_1_PORT).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true)));
+ server.start();
+
+ node_1.start();
+ node_2.start();
+
+ configureAddressAndQueue(node_1);
+ configureAddressAndQueue(server);
+ configureAddressAndQueue(node_2);
+
+ waitForTopology(node_1, 2);
+ waitForTopology(node_2, 2);
+
+ {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("test");
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("hello"));
+ }
+
+ org.apache.activemq.artemis.core.server.Queue mainServerQueue =
server.locateQueue("test");
+ org.apache.activemq.artemis.core.server.Queue node1Queue =
node_1.locateQueue("test");
+ org.apache.activemq.artemis.core.server.Queue node2Queue =
node_2.locateQueue("test");
+
+ Wait.assertEquals(10L, node1Queue::getMessageCount, 5000, 10);
+ }
+ }
+
+ private ServerLocator getNode1ServerLocator() throws Exception {
+ return
addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:" +
NODE_1_PORT));
+ }
+
+ private ServerLocator getNode2ServerLocator() throws Exception {
+ return
addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:" +
NODE_2_PORT));
+ }
+
+ private ActiveMQServer createNode1(MessageLoadBalancingType
loadBalancingType) throws Exception {
+ ActiveMQServer node_1 = createServer(NODE_1_PORT, false);
+
+ ClusterConnectionConfiguration clusterConfiguration = new
ClusterConnectionConfiguration().setName("cluster").setConnectorName("node1").setMessageLoadBalancingType(loadBalancingType).setStaticConnectors(Collections.singletonList("node2"));
+
+ node_1.setIdentity("node_1");
+ node_1.getConfiguration().setName("node_1").setHAPolicyConfiguration(new
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("node1",
"tcp://localhost:" + NODE_1_PORT).addConnectorConfiguration("node2",
"tcp://localhost:" + NODE_2_PORT).addClusterConfiguration(clusterConfiguration);
+
+ return node_1;
+ }
+
+ private ActiveMQServer createNode2(MessageLoadBalancingType
loadBalancingType) throws Exception {
+ ActiveMQServer node_2 = createServer(NODE_2_PORT, false);
+
+ ClusterConnectionConfiguration clusterConfiguration = new
ClusterConnectionConfiguration().setName("cluster").setConnectorName("node2").setMessageLoadBalancingType(loadBalancingType).setStaticConnectors(Collections.singletonList("node1"));
+
+ node_2.setIdentity("node_2");
+ node_2.getConfiguration().setName("node_2").setHAPolicyConfiguration(new
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("node1",
"tcp://localhost:" + NODE_1_PORT).addConnectorConfiguration("node2",
"tcp://localhost:" + NODE_2_PORT).addClusterConfiguration(clusterConfiguration);
+
+ return node_2;
+ }
+
+ private void configureAddressAndQueue(ActiveMQServer node) throws Exception
{
+ node.addAddressInfo(new AddressInfo("test").setAutoCreated(false));
+ node.getAddressSettingsRepository().addMatch("test", new
AddressSettings().setRedistributionDelay(0));
+ node.createQueue(new
QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST).setDurable(true));
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,CORE";
+ }
+}