This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d5cda320a ARTEMIS-5717 Fix for lost messages from conflict of message 
routing types
6d5cda320a is described below

commit 6d5cda320ad428a04bf7eb304f006532212381cf
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Oct 27 16:26:24 2025 -0400

    ARTEMIS-5717 Fix for lost messages from conflict of message routing types
    
    When a message carries a routing type or a JMS destination type that 
conflicts
    with the actual type of the address the message gets routed to the message 
can
    be dropped with no matching routes which can lead to unexpected message 
loss.
    This can happen when a message sent to a MULTICAST address is sent to a DLA
    with an ANYCAST address type and then is sent to a DLA on a different broker
    via a bridge of via a federation queue receiver link as one simple example.
---
 .../activemq/artemis/api/core/RoutingType.java     |   8 ++
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  44 +++---
 .../amqp/connect/AMQPBrokerConnection.java         |   3 +-
 .../bridge/AMQPBridgeFromAddressReceiver.java      |  10 +-
 .../bridge/AMQPBridgeFromQueueReceiver.java        |  10 +-
 .../AMQPFederationAddressConduitConsumer.java      |   2 +-
 .../federation/AMQPFederationAddressConsumer.java  |   7 +-
 .../federation/AMQPFederationQueueConsumer.java    |   9 +-
 .../amqp/proton/ProtonServerReceiverContext.java   | 148 +++++++++++++++------
 .../proton/ProtonServerReceiverContextTest.java    |   2 +-
 .../artemis/core/server/ServerSession.java         |  17 +++
 .../core/server/impl/ServerSessionImpl.java        | 110 ++++++++-------
 .../amqp/connect/AMQPBridgeServerToServerTest.java | 123 +++++++++++++++++
 .../connect/AMQPFederationServerToServerTest.java  |  77 +++++++++++
 .../amqp/connect/AMQPRoutingTypeMismatchTest.java  | 145 ++++++++++++++++++++
 15 files changed, 590 insertions(+), 125 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/RoutingType.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/RoutingType.java
index aefa16be5a..b1fb2e399a 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/RoutingType.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/RoutingType.java
@@ -35,4 +35,12 @@ public enum RoutingType {
          default -> null;
       };
    }
+
+   public static RoutingType getTypeOrDefault(byte type, RoutingType 
defaultType) {
+      return switch (type) {
+         case 0 -> MULTICAST;
+         case 1 -> ANYCAST;
+         default -> defaultType;
+      };
+   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 988ebdd5dc..3b1a9b73e9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -518,36 +518,33 @@ public class AMQPSessionCallback implements 
SessionCallback {
                           Receiver receiver,
                           Delivery delivery,
                           SimpleString address,
+                          RoutingType routingType,
                           RoutingContext routingContext,
                           Message message) throws Exception {
 
       context.incrementSettle();
 
-      RoutingType routingType = null;
-      if (address != null) {
+      final SimpleString messageAddress = message.getAddressSimpleString();
+      final SimpleString selectedAddress;
+
+      if (!context.isAnonymousRelay()) {
          // set Fixed-address producer if the message.properties.to address 
differs from the producer
-         if (!address.toString().equals(message.getAddress())) {
+         if (!address.equals(messageAddress)) {
             message.setAddress(address);
          }
-         routingType = context.getDefRoutingType();
-      } else {
-         // Anonymous-relay producer, message must carry a To value
-         address = message.getAddressSimpleString();
-         if (address == null) {
-            // Errors are not currently handled as required by AMQP 1.0 
anonterm-v1.0
-            rejectMessage(context, delivery, Symbol.valueOf("failed"), 
"Missing 'to' field for message sent to an anonymous producer");
-            return;
-         }
 
-         routingType = message.getRoutingType();
-         if (routingType == null) {
-            routingType = context.getRoutingType(receiver, address);
-         }
+         selectedAddress = address;
+      } else if (messageAddress == null) {
+         // Anonymous-relay producer, message must carry a To value
+         // Errors are not currently handled as required by AMQP 1.0 
anonterm-v1.0
+         rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 
'to' field for message sent to an anonymous producer");
+         return;
+      } else {
+         selectedAddress = messageAddress;
       }
 
-      //here check queue-autocreation
-      if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
-         ActiveMQException e = 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
+      if (!checkAddressAndAutocreateIfPossible(selectedAddress, routingType)) {
+         ActiveMQException e = 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(selectedAddress.toString());
          if (transaction != null) {
             transaction.markAsRollbackOnly(e);
          }
@@ -557,7 +554,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
       OperationContext oldcontext = recoverContext();
 
       try {
-         PagingStore store = 
manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
+         PagingStore store = 
manager.getServer().getPagingManager().getPageStore(selectedAddress);
          if (store != null && store.isRejectingMessages()) {
             // We drop pre-settled messages (and abort any associated Tx)
             String amqpAddress = delivery.getLink().getTarget().getAddress();
@@ -572,7 +569,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
          } else {
             
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
             // We need to transfer IO execution to a different thread 
otherwise we may deadlock netty loop
-            sessionExecutor.execute(() -> inSessionSend(context, transaction, 
message, delivery, receiver, routingContext));
+            sessionExecutor.execute(() -> inSessionSend(context, transaction, 
message, selectedAddress, routingType, delivery, receiver, routingContext));
          }
       } catch (Exception e) {
          onSendFailed(message, transaction, e);
@@ -604,19 +601,20 @@ public class AMQPSessionCallback implements 
SessionCallback {
 
          }
       });
-
    }
 
    private void inSessionSend(final ProtonServerReceiverContext context,
                               final Transaction transaction,
                               final Message message,
+                              final SimpleString address,
+                              final RoutingType routingType,
                               final Delivery delivery,
                               final Receiver receiver,
                               final RoutingContext routingContext) {
       OperationContext oldContext = recoverContext();
       try {
          if (invokeIncoming(message, (ActiveMQProtonRemotingConnection) 
transportConnection.getProtocolConnection()) == null) {
-            serverSession.send(transaction, message, directDeliver, 
receiver.getName(), false, routingContext);
+            serverSession.send(transaction, address, routingType, message, 
directDeliver, receiver.getName(), false, routingContext);
 
             afterIO(new IOCallback() {
                @Override
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 17d677bb29..1bb6b4ed8d 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -851,7 +851,8 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
                         public void initialize() throws Exception {
                            initialized = true;
                            address = SimpleString.of(target.getAddress());
-                           defRoutingType = 
getRoutingType(target.getCapabilities(), address);
+                           explicitRoutingType = queue.getRoutingType();
+                           implicitRoutingType = queue.getRoutingType();
 
                            try {
                               // Check if the queue that triggered the attach 
still exists or has it been removed
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
index e879368706..ae58e75ca8 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -329,10 +330,13 @@ public class AMQPBridgeFromAddressReceiver extends 
AMQPBridgeReceiver {
          }
 
          address = SimpleString.of(receiverInfo.getLocalAddress());
-         defRoutingType = receiverInfo.getRoutingType();
+         explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), address);
+         implicitRoutingType = getImplicitRoutingType(address);
+
+         final RoutingType selectedRoutingType = explicitRoutingType != null ? 
explicitRoutingType : implicitRoutingType;
 
          try {
-            final AddressQueryResult result = sessionSPI.addressQuery(address, 
defRoutingType, false);
+            final AddressQueryResult result = sessionSPI.addressQuery(address, 
selectedRoutingType, false);
 
             // We initiated this link so the settings should refer to an 
address that definitely exists
             // however there is a chance the address was removed in the 
interim.
@@ -374,7 +378,7 @@ public class AMQPBridgeFromAddressReceiver extends 
AMQPBridgeReceiver {
                             transformer, message, theMessage);
             }
 
-            sessionSPI.serverSend(this, tx, receiver, delivery, cachedAddress, 
routingContext, theMessage);
+            sessionSPI.serverSend(this, tx, receiver, delivery, cachedAddress, 
getPreferredRoutingType(), routingContext, theMessage);
          } catch (Exception e) {
             logger.warn("Inbound delivery for {} encountered an error: {}", 
receiverInfo, e.getMessage(), e);
             deliveryFailed(delivery, receiver, e);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
index 2ecdb732fc..766cb3de11 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -298,10 +299,13 @@ public class AMQPBridgeFromQueueReceiver extends 
AMQPBridgeReceiver {
          }
 
          address = SimpleString.of(receiverInfo.getLocalQueue());
-         defRoutingType = receiverInfo.getRoutingType();
+         explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), address);
+         implicitRoutingType = getImplicitRoutingType(address);
+
+         final RoutingType selectedRoutingType = explicitRoutingType != null ? 
explicitRoutingType : implicitRoutingType;
 
          try {
-            final QueueQueryResult result = sessionSPI.queueQuery(address, 
defRoutingType, false);
+            final QueueQueryResult result = sessionSPI.queueQuery(address, 
selectedRoutingType, false);
 
             // We initiated this link so the settings should refer to an queue 
that definitely exists
             // however there is a chance the address was removed in the 
interim.
@@ -343,7 +347,7 @@ public class AMQPBridgeFromQueueReceiver extends 
AMQPBridgeReceiver {
                             transformer, message, theMessage);
             }
 
-            sessionSPI.serverSend(this, tx, receiver, delivery, cachedFqqn, 
routingContext, theMessage);
+            sessionSPI.serverSend(this, tx, receiver, delivery, cachedFqqn, 
getPreferredRoutingType(), routingContext, theMessage);
          } catch (Exception e) {
             logger.warn("Inbound delivery for {} encountered an error: {}", 
receiverInfo, e.getMessage(), e);
             deliveryFailed(delivery, receiver, e);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConduitConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConduitConsumer.java
index 56d70fae43..80500041e5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConduitConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConduitConsumer.java
@@ -67,7 +67,7 @@ public final class AMQPFederationAddressConduitConsumer 
extends AMQPFederationAd
 
       @Override
       protected void routeFederatedMessage(Message message, Delivery delivery, 
Receiver receiver, Transaction tx) throws Exception {
-         sessionSPI.serverSend(this, tx, receiver, delivery, cachedAddress, 
routingContext, message);
+         sessionSPI.serverSend(this, tx, receiver, delivery, cachedAddress, 
getPreferredRoutingType(), routingContext, message);
       }
    }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index 272d7dd048..483e9729cd 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -382,10 +382,13 @@ public abstract class AMQPFederationAddressConsumer 
extends AMQPFederationConsum
          }
 
          address = SimpleString.of(target.getAddress());
-         defRoutingType = getRoutingType(target.getCapabilities(), address);
+         explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), address);
+         implicitRoutingType = getImplicitRoutingType(address);
+
+         final RoutingType selectedRoutingType = explicitRoutingType != null ? 
explicitRoutingType : implicitRoutingType;
 
          try {
-            final AddressQueryResult result = sessionSPI.addressQuery(address, 
defRoutingType, false);
+            final AddressQueryResult result = sessionSPI.addressQuery(address, 
selectedRoutingType, false);
 
             // We initiated this link so the target should refer to an address 
that definitely exists
             // however there is a chance the address was removed in the 
interim.
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index 06dd708fa6..b80b0d218e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -295,10 +295,13 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
          }
 
          address = SimpleString.of(target.getAddress());
-         defRoutingType = getRoutingType(target.getCapabilities(), address);
+         explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), address);
+         implicitRoutingType = getImplicitRoutingType(address);
+
+         final RoutingType selectedRoutingType = explicitRoutingType != null ? 
explicitRoutingType : implicitRoutingType;
 
          try {
-            final QueueQueryResult result = sessionSPI.queueQuery(address, 
defRoutingType, false);
+            final QueueQueryResult result = sessionSPI.queueQuery(address, 
selectedRoutingType, false);
 
             // We initiated this link so the target should refer to an queue 
that definitely exists
             // however there is a chance the queue was removed in the interim.
@@ -345,7 +348,7 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
             }
 
             signalPluginBeforeFederationConsumerMessageHandled(theMessage);
-            sessionSPI.serverSend(this, tx, receiver, delivery, cachedFqqn, 
routingContext, theMessage);
+            sessionSPI.serverSend(this, tx, receiver, delivery, cachedFqqn, 
getPreferredRoutingType(), routingContext, theMessage);
             signalPluginAfterFederationConsumerMessageHandled(theMessage);
          } catch (Exception e) {
             logger.warn("Inbound delivery for {} encountered an error: {}", 
consumerInfo, e.getMessage(), e);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index dae8fe135c..624e0d18ae 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -30,6 +30,7 @@ import 
org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
@@ -39,6 +40,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
@@ -57,7 +59,8 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
    protected SimpleString lastAddress;
    protected AddressFullMessagePolicy lastAddressPolicy;
    protected boolean addressAlreadyClashed = false;
-   protected RoutingType defRoutingType;
+   protected RoutingType explicitRoutingType;
+   protected RoutingType implicitRoutingType;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
                                       AMQPConnectionContext connection,
@@ -83,11 +86,13 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
             // if dynamic we have to create the node (queue) and set the 
address on the target, the node is temporary and
             // will be deleted on closing of the session
             address = SimpleString.of(sessionSPI.tempQueueName());
-            defRoutingType = getRoutingType(target.getCapabilities(), address);
+
+            explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), address);
+            implicitRoutingType = getImplicitRoutingType(address);
 
             try {
-               if (defRoutingType == RoutingType.ANYCAST) {
-                  sessionSPI.createTemporaryQueue(address, defRoutingType);
+               if (getPreferredRoutingType() == RoutingType.ANYCAST) {
+                  sessionSPI.createTemporaryQueue(address, 
explicitRoutingType);
                } else {
                   sessionSPI.createTemporaryAddress(address);
                }
@@ -109,9 +114,13 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
             }
 
             if (address != null) {
-               defRoutingType = getRoutingType(target.getCapabilities(), 
address);
+               explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), address);
+               implicitRoutingType = getImplicitRoutingType(address);
+
+               final RoutingType selectedRoutingType = explicitRoutingType != 
null ? explicitRoutingType : implicitRoutingType;
+
                try {
-                  if (!sessionSPI.checkAddressAndAutocreateIfPossible(address, 
defRoutingType)) {
+                  if (!sessionSPI.checkAddressAndAutocreateIfPossible(address, 
selectedRoutingType)) {
                      throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
                   }
                } catch (ActiveMQAMQPNotFoundException e) {
@@ -130,6 +139,8 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
                } catch (ActiveMQSecurityException e) {
                   throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingProducer(e.getMessage());
                }
+            } else {
+               explicitRoutingType = 
getExplicitRoutingType(target.getCapabilities(), null);
             }
          }
 
@@ -162,51 +173,91 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       return address != null ? address : lastAddress;
    }
 
-   public RoutingType getDefRoutingType() {
-      return defRoutingType;
+   /**
+    * {@return true if the context is an anonymous relay (no target address)}
+    */
+   public final boolean isAnonymousRelay() {
+      return address == null;
    }
 
-   public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
-      org.apache.qpid.proton.amqp.messaging.Target target = 
(org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
-      return target != null ? getRoutingType(target.getCapabilities(), 
address) : getRoutingType((Symbol[]) null, address);
+   /**
+    * Return the explicit routing type if one was provided and if not then 
return the
+    * implicit routing type as a fallback.
+    *
+    * @return the highest priority routing type this receiver selected.
+    */
+   public RoutingType getPreferredRoutingType() {
+      return explicitRoutingType != null ? explicitRoutingType : 
implicitRoutingType;
    }
 
-   protected final RoutingType getRoutingType(Symbol[] symbols, SimpleString 
address) {
-      RoutingType explicitRoutingType = getExplicitRoutingType(symbols);
-      if (explicitRoutingType != null) {
-         return explicitRoutingType;
-      } else {
-         final AddressInfo addressInfo = sessionSPI.getAddress(address);
-         /*
-          * If we're dealing with an *existing* address that has just one 
routing-type simply use that.
-          * This allows "bare" AMQP clients (which have no built-in routing 
semantics) to send messages
-          * wherever they want in this case because the routing ambiguity is 
eliminated.
-          */
-         if (addressInfo != null && addressInfo.getRoutingTypes().size() == 1) 
{
-            return addressInfo.getRoutingType();
-         } else {
-            return getDefaultRoutingType(sessionSPI, address);
-         }
-      }
+   /**
+    * Returns the routing type the client defined for this receiver using 
capabilities set
+    * in the link {@link Target} or an address prefix.
+    *
+    * @return the explicit routing type defined in the link target 
capabilities or via address prefix.
+    */
+   public RoutingType getExplicitRoutingType() {
+      return explicitRoutingType;
+   }
+
+   /**
+    * Returns the routing type of the actual link target (address or queue) if 
one could
+    * be determined based on broker defined defaults.
+    *
+    * @return the implicit routing type based on the target address.
+    */
+   public RoutingType getImplicitRoutingType() {
+      return implicitRoutingType;
    }
 
    @Override
    protected void actualDelivery(Message message, Delivery delivery, 
DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
       try {
-         if (sessionSPI != null) {
-            // message could be null on unit tests (Mocking from 
ProtonServerReceiverContextTest).
-            if (address == null && message != null) {
-               validateAddressOnAnonymousLink(message);
-            }
-            sessionSPI.serverSend(this, tx, receiver, delivery, address, 
routingContext, message);
+         if (sessionSPI == null) {
+            return;
+         }
+
+         if (isAnonymousRelay()) {
+            validateAddressOnAnonymousLink(message);
          }
+
+         sessionSPI.serverSend(this, tx, receiver, delivery, address, 
deduceRoutingType(message), routingContext, message);
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);
-
          deliveryFailed(delivery, receiver, e);
       }
    }
 
+   protected RoutingType deduceRoutingType(Message message) {
+      RoutingType routingType = getExplicitRoutingType();
+
+      if (routingType == null) {
+         if (isAnonymousRelay()) {
+            final SimpleString address = message.getAddressSimpleString();
+
+            routingType = message.getRoutingType();
+
+            if (routingType == null && address != null) {
+               routingType = sessionSPI.getRoutingTypeFromPrefix(address, 
getImplicitRoutingType(address));
+            }
+         } else {
+            final Object msgRoutingType = 
message.getAnnotation(SimpleString.of(AMQPMessageSupport.ROUTING_TYPE.toString()));
+
+            // This can lead to message being dropped on a link that matches 
an address whose
+            // routing type is MULTICAST but the messages get sent with the 
ANYCAST routing type
+            // or vice versa but we supported this sort of thing and changing 
if results in changes
+            // in behavior from past releases.
+            if (msgRoutingType != null) {
+               routingType = RoutingType.getTypeOrDefault(((Number) 
msgRoutingType).byteValue(), getImplicitRoutingType());
+            } else {
+               routingType = getImplicitRoutingType();
+            }
+         }
+      }
+
+      return routingType;
+   }
+
    private void validateAddressOnAnonymousLink(Message message) throws 
Exception {
       SimpleString newAddress = message.getAddressSimpleString();
       if (newAddress != null && !newAddress.equals(lastAddress)) {
@@ -219,8 +270,8 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
 
             logger.debug("AddressFullPolicy clash between {}/{} and {}/{}", 
lastAddress, lastAddressPolicy, newAddress, lastAddressPolicy);
          }
-         this.lastAddress = message.getAddressSimpleString();
-         this.lastAddressPolicy = currentPolicy;
+         lastAddress = message.getAddressSimpleString();
+         lastAddressPolicy = currentPolicy;
       }
    }
 
@@ -243,11 +294,25 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       }
    }
 
-   private static RoutingType getDefaultRoutingType(AMQPSessionCallback 
sessionSPI, SimpleString address) {
-      return 
Objects.requireNonNullElse(sessionSPI.getRoutingTypeFromPrefix(address, 
sessionSPI.getDefaultRoutingType(address)), 
ActiveMQDefaultConfiguration.getDefaultRoutingType());
+   protected final RoutingType getImplicitRoutingType(SimpleString address) {
+      final AddressInfo addressInfo = sessionSPI.getAddress(address);
+      /*
+       * If we're dealing with an *existing* address that has just one 
routing-type simply use that.
+       * This allows "bare" AMQP clients (which have no built-in routing 
semantics) to send messages
+       * wherever they want in this case because the routing ambiguity is 
eliminated.
+       */
+      if (addressInfo != null) {
+         if (addressInfo.getRoutingTypes().size() == 1) {
+            return addressInfo.getRoutingType();
+         } else {
+            return null; // We don't assume which type the client wanted, we 
just route it.
+         }
+      } else {
+         return 
Objects.requireNonNullElse(sessionSPI.getDefaultRoutingType(address), 
ActiveMQDefaultConfiguration.getDefaultRoutingType());
+      }
    }
 
-   private static RoutingType getExplicitRoutingType(Symbol[] symbols) {
+   protected final RoutingType getExplicitRoutingType(Symbol[] symbols, 
SimpleString address) {
       if (symbols != null) {
          for (Symbol symbol : symbols) {
             if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || 
AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@@ -257,6 +322,7 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
             }
          }
       }
-      return null;
+
+      return address != null ? sessionSPI.getRoutingTypeFromPrefix(address, 
null) : null;
    }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index e6cc880044..1fc53f6224 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -235,7 +235,7 @@ public class ProtonServerReceiverContextTest {
       source.setDefaultOutcome(defaultOutcome);
       when(mockReceiver.getSource()).thenReturn(source);
 
-      doThrow(deliveryException).when(mockSession).serverSend(eq(rc), 
nullable(Transaction.class), eq(mockReceiver), eq(mockDelivery), 
nullable(SimpleString.class), any(RoutingContext.class), 
nullable(AMQPMessage.class));
+      doThrow(deliveryException).when(mockSession).serverSend(eq(rc), 
nullable(Transaction.class), eq(mockReceiver), eq(mockDelivery), 
nullable(SimpleString.class), any(), any(RoutingContext.class), 
nullable(AMQPMessage.class));
 
       rc.onMessage(mockDelivery);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index cc281bba45..92ec11ca4e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -348,6 +348,14 @@ public interface ServerSession extends SecurityAuth {
                       boolean noAutoCreateQueue,
                       RoutingContext routingContext) throws Exception;
 
+   RoutingStatus send(Transaction tx,
+                      SimpleString address,
+                      RoutingType routingType,
+                      Message message,
+                      boolean direct,
+                      String senderName,
+                      boolean noAutoCreateQueue,
+                      RoutingContext routingContext) throws Exception;
 
    RoutingStatus doSend(Transaction tx,
                         Message msg,
@@ -364,6 +372,15 @@ public interface ServerSession extends SecurityAuth {
                         boolean noAutoCreateQueue,
                         RoutingContext routingContext) throws Exception;
 
+   RoutingStatus doSend(Transaction tx,
+                        RoutingType routingType,
+                        Message msg,
+                        SimpleString originalAddress,
+                        boolean direct,
+                        String senderName,
+                        boolean noAutoCreateQueue,
+                        RoutingContext routingContext) throws Exception;
+
    RoutingStatus send(Message message, boolean direct, String senderName, 
boolean noAutoCreateQueue) throws Exception;
 
    RoutingStatus send(Message message, boolean direct, String senderName) 
throws Exception;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 97ba38bd8d..880dbcba0c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1967,24 +1967,36 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
 
    @Override
    public synchronized RoutingStatus send(Transaction tx,
-                                          Message msg,
+                                          Message message,
                                           final boolean direct,
                                           final String senderName,
                                           boolean noAutoCreateQueue) throws 
Exception {
-      return send(tx, msg, direct, senderName, noAutoCreateQueue, 
routingContext);
+      return send(tx, message, direct, senderName, noAutoCreateQueue, 
routingContext);
    }
 
    @Override
    public synchronized RoutingStatus send(Transaction tx,
-                                          Message messageParameter,
+                                          Message message,
                                           final boolean direct,
                                           final String senderName,
                                           boolean noAutoCreateQueue,
                                           RoutingContext routingContext) 
throws Exception {
-      final Message message = 
LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
+      return send(tx, message.getAddressSimpleString(), 
message.getRoutingType(), message, direct, senderName, noAutoCreateQueue, 
routingContext);
+   }
+
+   @Override
+   public synchronized RoutingStatus send(Transaction tx,
+                                          SimpleString address,
+                                          RoutingType routingType,
+                                          Message msg,
+                                          final boolean direct,
+                                          final String senderName,
+                                          boolean noAutoCreateQueue,
+                                          RoutingContext routingContext) 
throws Exception {
+      final Message theMessage = LargeServerMessageImpl.checkLargeMessage(msg, 
storageManager);
 
       if (server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, 
message, direct, noAutoCreateQueue));
+         server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, 
theMessage, direct, noAutoCreateQueue));
       }
 
       final RoutingStatus result;
@@ -2000,45 +2012,46 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
 
          //large message may come from StompSession directly, in which
          //case the id header already generated.
-         if (!message.isLargeMessage()) {
+         if (!theMessage.isLargeMessage()) {
             long id = storageManager.generateID();
             // This will re-encode the message
-            message.setMessageID(id);
+            theMessage.setMessageID(id);
          }
 
-         SimpleString address = message.getAddressSimpleString();
-
          if (defaultAddress == null && address != null) {
             defaultAddress = address;
          }
 
          if (address == null) {
             // We don't want to force a re-encode when the message gets sent 
to the consumer
-            message.setAddress(defaultAddress);
+            theMessage.setAddress(defaultAddress);
+         } else {
+            // We need to carry forward the chosen address from the caller to 
the later send stages.
+            theMessage.setAddress(address);
          }
 
          if (logger.isTraceEnabled()) {
-            logger.trace("send(message={}, direct={}) being called", message, 
direct);
+            logger.trace("send(message={}, direct={}) being called", 
theMessage, direct);
          }
 
-         if (message.getAddress() == null) {
+         if (theMessage.getAddress() == null) {
             // This could happen with some tests that are ignoring messages
             throw ActiveMQMessageBundle.BUNDLE.noAddress();
          }
 
-         if (message.getAddressSimpleString().equals(managementAddress)) {
+         if (theMessage.getAddressSimpleString().equals(managementAddress)) {
             // It's a management message
 
-            result = handleManagementMessage(tx, message, direct);
+            result = handleManagementMessage(tx, theMessage, direct);
          } else {
             try {
-               result = doSend(tx, message, address, direct, senderName, 
noAutoCreateQueue, routingContext);
+               result = doSend(tx, routingType, theMessage, address, direct, 
senderName, noAutoCreateQueue, routingContext);
             } catch (ActiveMQIOErrorException e) {
                if (tx != null) {
                   tx.markAsRollbackOnly(e);
                }
-               if (message.isLargeMessage()) {
-                  ((LargeServerMessage)message).deleteFile();
+               if (theMessage.isLargeMessage()) {
+                  ((LargeServerMessage)theMessage).deleteFile();
                }
                throw e;
             }
@@ -2046,31 +2059,31 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
 
          if (AuditLogger.isMessageLoggingEnabled()) {
             if (tx != null && !autoCommitSends) {
-               
AuditLogger.addSendToTransaction(remotingConnection.getSubject(), 
remotingConnection.getRemoteAddress(), message.toString(), tx.toString());
+               
AuditLogger.addSendToTransaction(remotingConnection.getSubject(), 
remotingConnection.getRemoteAddress(), theMessage.toString(), tx.toString());
                tx.addOperation(new TransactionOperationAbstract() {
                   @Override
                   public void afterCommit(Transaction tx) {
-                     auditLogSend(message, tx);
+                     auditLogSend(theMessage, tx);
                   }
 
                   @Override
                   public void afterRollback(Transaction tx) {
-                     
AuditLogger.rolledBackTransaction(remotingConnection.getSubject(), 
remotingConnection.getRemoteAddress(), tx.toString(), message.toString());
+                     
AuditLogger.rolledBackTransaction(remotingConnection.getSubject(), 
remotingConnection.getRemoteAddress(), tx.toString(), theMessage.toString());
                   }
                });
             } else {
-               auditLogSend(message, null);
+               auditLogSend(theMessage, null);
             }
          }
 
       } catch (Exception e) {
          if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> 
plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e));
+            server.callBrokerMessagePlugins(plugin -> 
plugin.onSendException(this, tx, theMessage, direct, noAutoCreateQueue, e));
          }
          throw e;
       }
       if (server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, 
autoCommitSends ? null : tx, message, direct, noAutoCreateQueue, result));
+         server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, 
autoCommitSends ? null : tx, theMessage, direct, noAutoCreateQueue, result));
       }
       return result;
    }
@@ -2387,7 +2400,6 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
       return doSend(tx, msg, originalAddress, direct, senderName, 
noAutoCreateQueue, routingContext);
    }
 
-
    @Override
    public synchronized RoutingStatus doSend(final Transaction tx,
                                             final Message msg,
@@ -2396,30 +2408,34 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
                                             final String senderName,
                                             final boolean noAutoCreateQueue,
                                             final RoutingContext 
routingContext) throws Exception {
+      return doSend(tx, msg.getRoutingType(), msg, originalAddress, direct, 
senderName, noAutoCreateQueue, routingContext);
+   }
 
-      RoutingStatus result = RoutingStatus.OK;
+   @Override
+   public synchronized RoutingStatus doSend(final Transaction tx,
+                                            final RoutingType routingType,
+                                            final Message message,
+                                            final SimpleString originalAddress,
+                                            final boolean direct,
+                                            final String senderName,
+                                            final boolean noAutoCreateQueue,
+                                            final RoutingContext 
routingContext) throws Exception {
 
-      RoutingType routingType = msg.getRoutingType();
+      RoutingStatus result = RoutingStatus.OK;
 
-         /* TODO-now: How to address here with AMQP?
-         if (originalAddress != null) {
-            if (originalAddress.toString().startsWith("anycast:")) {
-               routingType = RoutingType.ANYCAST;
-            } else if (originalAddress.toString().startsWith("multicast:")) {
-               routingType = RoutingType.MULTICAST;
-            }
-         } */
+      final AddressInfo targetFromMessage = new 
AddressInfo(message.getAddressSimpleString(), routingType);
+      final AddressInfo art = getAddressAndRoutingType(targetFromMessage);
 
-      final AddressInfo targetFromMessage = new 
AddressInfo(msg.getAddressSimpleString(), routingType);
-      AddressInfo art = getAddressAndRoutingType(targetFromMessage);
       if (art != targetFromMessage) {
          // remove the prefix from the message, with the address model change, 
only non prefixed addresses exist on the broker
-         msg.setAddress(art.getName());
+         message.setAddress(art.getName());
       }
 
+      final SimpleString messageAddress = message.getAddressSimpleString();
+
       // check the user has write access to this address (and potentially 
queue).
       try {
-         
securityCheck(CompositeAddress.extractAddressName(msg.getAddressSimpleString()),
 CompositeAddress.isFullyQualified(msg.getAddressSimpleString()) ? 
CompositeAddress.extractQueueName(msg.getAddressSimpleString()) : null, 
CheckType.SEND, this);
+         securityCheck(CompositeAddress.extractAddressName(messageAddress), 
CompositeAddress.isFullyQualified(messageAddress) ? 
CompositeAddress.extractQueueName(messageAddress) : null, CheckType.SEND, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends && tx != null) {
             tx.markAsRollbackOnly(e);
@@ -2428,16 +2444,16 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
       }
 
       if (server.getConfiguration().isPopulateValidatedUser() && validatedUser 
!= null) {
-         msg.setValidatedUserID(validatedUser);
+         message.setValidatedUserID(validatedUser);
       }
 
-      if (server.getConfiguration().isRejectEmptyValidatedUser() && 
msg.getValidatedUserID() == null) {
+      if (server.getConfiguration().isRejectEmptyValidatedUser() && 
message.getValidatedUserID() == null) {
          throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser();
       }
 
-      if 
(server.getAddressSettingsRepository().getMatch(msg.getAddress()).isEnableIngressTimestamp())
 {
-         msg.setIngressTimestamp();
-         msg.reencode();
+      if 
(server.getAddressSettingsRepository().getMatch(message.getAddress()).isEnableIngressTimestamp())
 {
+         message.setIngressTimestamp();
+         message.reencode();
       }
 
       if (tx == null || autoCommitSends) {
@@ -2452,13 +2468,13 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
 
          // Retrieve message size for metrics update before routing,
          // since large message backing files may be closed once routing 
completes
-         int mSize = msg instanceof LargeServerMessageImpl lsmi ? 
lsmi.getBodyBufferSize() : msg.getEncodeSize();
+         int mSize = message instanceof LargeServerMessageImpl lsmi ? 
lsmi.getBodyBufferSize() : message.getEncodeSize();
 
-         result = postOffice.route(msg, routingContext, direct);
+         result = postOffice.route(message, routingContext, direct);
 
-         logger.debug("Routing result for {} = {}", msg, result);
+         logger.debug("Routing result for {} = {}", message, result);
 
-         updateProducerMetrics(msg, senderName, mSize);
+         updateProducerMetrics(message, senderName, mSize);
       } finally {
          if (!routingContext.isReusable()) {
             routingContext.clear();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
index 4d9ecc8e9a..05a8c0cc04 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.tests.integration.amqp.connect;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_DEMAND_TRACKING;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -31,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -60,6 +62,11 @@ import 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.RepeatedTest;
@@ -1606,4 +1613,120 @@ class AMQPBridgeServerToServerTest extends 
AmqpClientTestSupport {
          }
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testBridgeToRemoteDLQThatHasMessagesSentToMulticastAddressOriginially() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPBridgeQueuePolicyElement bridgePolicy = new 
AMQPBridgeQueuePolicyElement();
+      bridgePolicy.setName("drain-remote-dlq-policy");
+      bridgePolicy.addToIncludes(getDeadLetterAddress(), 
getDeadLetterAddress());
+      bridgePolicy.setRemoteAddress(getDeadLetterAddress() + "::" + 
getDeadLetterAddress()); // Direct target the desired FQQN
+
+      final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addBridgeToQueuePolicy(bridgePolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      remoteServer.start();
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+      server.start();
+
+      // Configure defaults
+      createAddressAndQueues(remoteServer);
+      createAddressAndQueues(server);
+
+      final AmqpClient client = createAmqpClient();
+      final AmqpConnection clientConnection = addConnection(client.connect());
+      final AmqpSession clientSession = clientConnection.createSession();
+      final AmqpReceiver receiver = 
clientSession.createReceiver(getTopicName());
+
+      receiver.flow(1);
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTopicName());
+         final MessageProducer producer = session.createProducer(topic);
+
+         // Sends a Topic message with Qpid JMS so that source message has the 
AMQP JMS mapping headers.
+         producer.send(session.createTextMessage("Message:"));
+      }
+
+      final AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.reject(); // Terminal outcome should be DLQ'd
+
+      Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 1_000);
+      Wait.assertTrue(() -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 
1_000);
+      Wait.assertEquals(1L, () -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
 5_000, 10);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testBridgeFromRemoteDLQThatHasMessagesSentToMulticastAddressOriginially() 
throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPBridgeQueuePolicyElement bridgePolicy = new 
AMQPBridgeQueuePolicyElement();
+      bridgePolicy.setName("drain-remote-dlq-policy");
+      bridgePolicy.addToIncludes(getDeadLetterAddress(), 
getDeadLetterAddress());
+      bridgePolicy.addProperty(DISABLE_RECEIVER_DEMAND_TRACKING, "true");
+
+      final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+      element.setName("From-Server1-DLQ");
+      element.addBridgeFromQueuePolicy(bridgePolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      remoteServer.getConfiguration().addAMQPConnection(amqpConnection);
+      remoteServer.start();
+      server.start();
+
+      // Configure defaults
+      createAddressAndQueues(remoteServer);
+      createAddressAndQueues(server);
+
+      final AmqpClient client = createAmqpClient();
+      final AmqpConnection clientConnection = addConnection(client.connect());
+      final AmqpSession clientSession = clientConnection.createSession();
+      final AmqpReceiver receiver = 
clientSession.createReceiver(getTopicName());
+
+      receiver.flow(1);
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTopicName());
+         final MessageProducer producer = session.createProducer(topic);
+
+         // Sends a Topic message with Qpid JMS so that source message has the 
AMQP JMS mapping headers.
+         producer.send(session.createTextMessage("Message:"));
+      }
+
+      final AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.reject(); // Terminal outcome should be DLQ'd
+
+      Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 1_000);
+      Wait.assertTrue(() -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 
1_000);
+      Wait.assertEquals(1L, () -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
 5_000, 10);
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index 268a507b2a..693799da18 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -63,6 +64,11 @@ import 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.RepeatedTest;
@@ -2083,4 +2089,75 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
          Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 0, 10_000, 50);
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederateFromRemoteDLQThatHasMessagesSentToMulticastAddressOriginially() 
throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationQueuePolicyElement queuePolicy = new 
AMQPFederationQueuePolicyElement();
+      queuePolicy.setName("remote-dlq-policy");
+      queuePolicy.addToIncludes(getDeadLetterAddress(), 
getDeadLetterAddress());
+
+      final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+      element.setName("From-Server1-DLQ");
+      element.addLocalQueuePolicy(queuePolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      remoteServer.getConfiguration().addAMQPConnection(amqpConnection);
+      remoteServer.start();
+      server.start();
+
+      // Configure defaults
+      createAddressAndQueues(remoteServer);
+      createAddressAndQueues(server);
+
+      final AmqpClient client = createAmqpClient();
+      final AmqpConnection clientConnection = addConnection(client.connect());
+      final AmqpSession clientSession = clientConnection.createSession();
+      final AmqpReceiver receiver = 
clientSession.createReceiver(getTopicName());
+
+      receiver.flow(1);
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         connection.setClientID("test-federation");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTopicName());
+         final MessageProducer producer = session.createProducer(topic);
+
+         // Sends a Topic message with Qpid JMS so that source message has the 
AMQP JMS mapping headers.
+         producer.send(session.createTextMessage("Message:"));
+      }
+
+      final AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.reject(); // Terminal outcome should be DLQ'd
+
+      final ConnectionFactory factoryRemote = new JmsConnectionFactory(
+         "amqp://localhost:" + SERVER_PORT_REMOTE + 
"?jms.prefetchPolicy.all=0");
+
+      try (Connection connectionR = factoryRemote.createConnection();
+           Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE)) {
+
+         connectionR.start();
+
+         final Queue queue = sessionR.createQueue(getDeadLetterAddress());
+         final MessageConsumer consumer = sessionR.createConsumer(queue);
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 1_000);
+         Wait.assertTrue(() -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 
1_000);
+         Wait.assertEquals(1L, () -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
 5_000, 10);
+
+         assertNotNull(consumer.receiveNoWait());
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRoutingTypeMismatchTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRoutingTypeMismatchTest.java
new file mode 100644
index 0000000000..bd08ee9f9c
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRoutingTypeMismatchTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPReceiverBrokerConnectionElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPSenderBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPRoutingTypeMismatchTest extends AmqpTestSupport {
+
+   private static final int AMQP_PORT_2 = 5673;
+
+   @Test
+   public void 
testReceiverTargetingQueueWithRoutingTypeNotMatchingOriginalMessageTarget() 
throws Exception {
+      final String DLQ_NAME = "topic.DLQ";
+
+      final ActiveMQServer server = createServer(AMQP_PORT, false);
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of(DLQ_NAME)).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server.getConfiguration().addQueueConfiguration(QueueConfiguration.of(DLQ_NAME).setRoutingType(RoutingType.ANYCAST));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(getName()).addRoutingType(RoutingType.MULTICAST));
+      server.setIdentity("Server1");
+      server.start();
+
+      final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("myID");
+
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createDurableConsumer(session.createTopic(getName()), "myTopic");
+         MessageProducer producer = 
session.createProducer(session.createTopic(getName()));
+
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+         connection.start();
+         assertNotNull(consumer.receive(5_000));
+         session.rollback();
+         assertNull(consumer.receiveNoWait());
+      }
+
+      Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(DLQ_NAME)).isExists(), 5_000, 100);
+      final Queue dlq = server.locateQueue(DLQ_NAME);
+      Wait.assertEquals(1L, dlq::getMessageCount, 5_000, 100);
+
+      final AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+      amqpConnection.addReceiver((AMQPReceiverBrokerConnectionElement) new 
AMQPReceiverBrokerConnectionElement().setMatchAddress(DLQ_NAME));
+
+      final ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
+      server2.getConfiguration().getAddressSettings().clear();
+      server2.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of(DLQ_NAME)).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server2.getConfiguration().addQueueConfiguration(QueueConfiguration.of(DLQ_NAME).setRoutingType(RoutingType.ANYCAST));
+      server2.setIdentity("Server2");
+      server2.getConfiguration().addAMQPConnection(amqpConnection);
+      server2.start();
+
+      Wait.assertTrue(() -> 
server2.queueQuery(SimpleString.of(DLQ_NAME)).isExists(), 5_000, 100);
+      final Queue dlqServer2 = server2.locateQueue(DLQ_NAME);
+      Wait.assertEquals(1L, dlqServer2::getMessageCount, 5_000, 100);
+   }
+
+   @Test
+   public void 
testSenderTargetingQueueWithRoutingTypeNotMatchingOriginalMessageTarget() 
throws Exception {
+      final String DLQ_NAME = "topic.DLQ";
+
+      final ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
+      server2.getConfiguration().getAddressSettings().clear();
+      server2.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of(DLQ_NAME)).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server2.getConfiguration().addQueueConfiguration(QueueConfiguration.of(DLQ_NAME).setRoutingType(RoutingType.ANYCAST));
+      server2.setIdentity("Server2");
+      server2.start();
+
+      final AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2);
+      amqpConnection.addSender((AMQPSenderBrokerConnectionElement) new 
AMQPSenderBrokerConnectionElement().setMatchAddress(DLQ_NAME));
+
+      final ActiveMQServer server = createServer(AMQP_PORT, false);
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of(DLQ_NAME)).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server.getConfiguration().addQueueConfiguration(QueueConfiguration.of(DLQ_NAME).setRoutingType(RoutingType.ANYCAST));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(getName()).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+      server.setIdentity("Server1");
+      server.start();
+
+      final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("myID");
+
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createDurableConsumer(session.createTopic(getName()), "myTopic");
+         MessageProducer producer = 
session.createProducer(session.createTopic(getName()));
+
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+         connection.start();
+         assertNotNull(consumer.receive(5_000));
+         session.rollback();
+         assertNull(consumer.receiveNoWait());
+      }
+
+      Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(DLQ_NAME)).isExists(), 5_000, 100);
+      Wait.assertTrue(() -> 
server2.queueQuery(SimpleString.of(DLQ_NAME)).isExists(), 5_000, 100);
+
+      final Queue dlq = server.locateQueue(DLQ_NAME);
+      Wait.assertEquals(1, dlq::getConsumerCount, 5_000, 100); // SENDER 
listening on local DLQ for forwards
+      Wait.assertEquals(0L, dlq::getMessageCount, 5_000, 100);
+      Wait.assertEquals(1L, dlq::getMessagesAcknowledged, 5_000, 100);
+
+      final Queue dlqServer2 = server2.locateQueue(DLQ_NAME);
+      Wait.assertEquals(1L, dlqServer2::getMessageCount, 5_000, 100);
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to