This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fd5d9b9ad0 ARTEMIS-4658 Prevent reflections when using dual address 
federation
fd5d9b9ad0 is described below

commit fd5d9b9ad03e15d8b90b4861358ab55c67207525
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Feb 28 11:03:59 2024 -0500

    ARTEMIS-4658 Prevent reflections when using dual address federation
    
    When federation is configured in two directions between nodes for an address
    the message can reflect from one node to another if max hops is not set or 
not
    set correctly and in some federation topologies the max hops value can't 
solve
    the issue and still result in a working configuration. This reflection 
should
    be prevented at the federation consumer level for address consumers.
---
 .../amqp/broker/ProtonProtocolManager.java         |   2 +-
 .../federation/AMQPFederationAddressConsumer.java  |  23 ++-
 .../AMQPFederationAddressSenderController.java     |  44 +++-
 .../federation/AMQPFederationPolicySupport.java    |   8 +-
 .../amqp/AmqpInboundConnectionTest.java            |   4 +-
 .../connect/AMQPFederationAddressPolicyTest.java   |  57 +++++-
 .../connect/AMQPFederationServerToServerTest.java  | 226 +++++++++++++++++++++
 7 files changed, 339 insertions(+), 25 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 07bb6ed407..6f246531e1 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -238,7 +238,7 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
          ttl = 0;
       }
 
-      String id = server.getConfiguration().getName();
+      String id = server.getNodeID().toString();
       boolean useCoreSubscriptionNaming = 
server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
       AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, 
connectionCallback, id, (int) ttl, getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), true, saslFactory, null, outgoing);
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index dda6e95cd3..777452c708 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -62,11 +62,13 @@ import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
 import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -274,19 +276,21 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
                source.setCapabilities(AmqpSupport.TOPIC_CAPABILITY);
             }
 
-            source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES, 
DEFAULT_OUTCOMES.length));
-            source.setDurable(TerminusDurability.NONE);
-            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-            source.setAddress(address);
+            final Map<Symbol, Object> filtersMap = new HashMap<>();
+            filtersMap.put(AmqpSupport.NO_LOCAL_NAME, 
AmqpNoLocalFilter.NO_LOCAL);
 
             if (consumerInfo.getFilterString() != null && 
!consumerInfo.getFilterString().isEmpty()) {
                final AmqpJmsSelectorFilter jmsFilter = new 
AmqpJmsSelectorFilter(consumerInfo.getFilterString());
-               final Map<Symbol, Object> filtersMap = new HashMap<>();
-               filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
 
-               source.setFilter(filtersMap);
+               filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
             }
 
+            source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES, 
DEFAULT_OUTCOMES.length));
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setAddress(address);
+            source.setFilter(filtersMap);
+
             target.setAddress(address);
 
             final Map<String, Object> addressSourceProperties = new 
HashMap<>();
@@ -526,6 +530,11 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
 
             if (message instanceof ICoreMessage) {
                baseMessage = incrementCoreMessageHops((ICoreMessage) message);
+
+               // Add / Update the connection Id value to reflect the remote 
container Id so that the
+               // no-local filter of a federation address receiver directed 
back to the source of this
+               // message will exclude it as intended.
+               
baseMessage.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING, 
getConnection().getRemoteContainer());
             } else {
                baseMessage = incrementAMQPMessageHops((AMQPMessage) message);
             }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
index 2ac48d55a4..56759303f6 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
@@ -48,6 +48,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -117,19 +118,22 @@ public final class AMQPFederationAddressSenderController 
extends AMQPFederationB
       final long autoDeleteDelay = ((Number) 
addressSourceProperties.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0)).longValue();
       final long autoDeleteMsgCount = ((Number) 
addressSourceProperties.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 
0)).longValue();
 
-      // An address receiver may opt to filter on things like max message hops 
so we must
-      // check for a filter here and apply it if it exists.
-      final Map.Entry<Symbol, DescribedType> filter = 
AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
+      // An address receiver may opt to filter on things like max message hops 
or no local message
+      // reflection so we must check for a filter here and apply it if it 
exists.
+      final String jmsSelector = getJMSSelectorFromFilters(source);
+      final Map.Entry<Symbol, DescribedType> noLocal = 
AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
 
-      if (filter != null) {
-         selector = filter.getValue().getDescribed().toString();
-         try {
-            SelectorParser.parse(selector);
-         } catch (FilterException e) {
-            throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid 
filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
+      if (noLocal != null) {
+         String remoteContainerId = protonConnection.getRemoteContainer();
+         String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING 
+ "<>'" + remoteContainerId + "'";
+
+         if (jmsSelector == null) {
+            selector = noLocalFilter;
+         } else {
+            selector = jmsSelector + " AND " + noLocalFilter;
          }
       } else {
-         selector = null;
+         selector = jmsSelector;
       }
 
       final SimpleString address = 
SimpleString.toSimpleString(source.getAddress());
@@ -195,6 +199,26 @@ public final class AMQPFederationAddressSenderController 
extends AMQPFederationB
       return (Consumer) sessionSPI.createSender(senderContext, queueName, 
null, false);
    }
 
+   @SuppressWarnings("unchecked")
+   private String getJMSSelectorFromFilters(Source source) throws 
ActiveMQAMQPException {
+      final Map.Entry<Symbol, DescribedType> jmsSelector = 
AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
+
+      String selectorString = null;
+
+      // Validate the JMS selector if present.
+      if (jmsSelector != null) {
+         selectorString = jmsSelector.getValue().getDescribed().toString();
+
+         try {
+            SelectorParser.parse(selectorString);
+         } catch (FilterException e) {
+            throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid 
filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
+         }
+      }
+
+      return selectorString;
+   }
+
    private static RoutingType getRoutingType(Source source) {
       if (source != null) {
          if (source.getCapabilities() != null) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
index 6a6a1b709b..1e47d993b9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
@@ -105,17 +105,17 @@ public final class AMQPFederationPolicySupport {
     * to indicate no max hops for federated messages on an address.
     *
     * @param maxHops
-    *    The max allowed number of hops before a message should stop cross 
federation links.
+    *    The max allowed number of hops before a message should stop crossing 
federation links.
     *
-    * @return the address filter string or null if not needed.
+    * @return the address filter string that should be applied (or null).
     */
    public static String generateAddressFilter(int maxHops) {
       if (maxHops <= 0) {
          return null;
       }
 
-      return "(\"m." + MESSAGE_HOPS_ANNOTATION.toString() +
-             "\" IS NULL OR \"m." + MESSAGE_HOPS_ANNOTATION.toString() +
+      return "(\"m." + MESSAGE_HOPS_ANNOTATION +
+             "\" IS NULL OR \"m." + MESSAGE_HOPS_ANNOTATION +
              "\"<" + maxHops + ")" +
              " AND " +
              "(" + MESSAGE_HOPS_PROPERTY + " IS NULL OR " +
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
index 02ff735503..1788ce4bca 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
@@ -75,6 +75,8 @@ public class AmqpInboundConnectionTest extends 
AmqpClientTestSupport {
 
    @Test(timeout = 60000)
    public void testBrokerContainerId() throws Exception {
+      final String containerId = server.getNodeID().toString();
+
       AmqpClient client = createAmqpClient();
       assertNotNull(client);
 
@@ -82,7 +84,7 @@ public class AmqpInboundConnectionTest extends 
AmqpClientTestSupport {
 
          @Override
          public void inspectOpenedResource(Connection connection) {
-            if (!BROKER_NAME.equals(connection.getRemoteContainer())) {
+            if (!containerId.equals(connection.getRemoteContainer())) {
                markAsInvalid("Broker did not send the expected container ID");
             }
          }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 21c4b55b12..4f9dfe18f1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -100,6 +100,8 @@ import 
org.apache.activemq.artemis.protocol.amqp.federation.Federation;
 import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
 import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -109,6 +111,10 @@ import org.apache.qpid.proton.amqp.transport.LinkError;
 import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
 import org.apache.qpid.protonj2.test.driver.ProtonTestPeer;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.test.driver.codec.messaging.Source;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
+import org.apache.qpid.protonj2.test.driver.codec.transport.Attach;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher;
 import 
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
 import 
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
@@ -388,6 +394,15 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
    @Test(timeout = 20000)
    public void 
testFederationCreatesAddressReceiverLinkForAddressMatchWithMaxHopsFilter() 
throws Exception {
+      
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(true);
+   }
+
+   @Test(timeout = 20000)
+   public void 
testFederationCreatesAddressReceiverLinkForAddressMatchWithoutMaxHopsFilter() 
throws Exception {
+      
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(false);
+   }
+
+   private void 
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(boolean 
maxHops) throws Exception {
       try (ProtonTestServer peer = new ProtonTestServer()) {
          peer.expectSASLAnonymousConnect();
          peer.expectOpen().respond();
@@ -410,7 +425,11 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
          final AMQPFederationAddressPolicyElement receiveFromAddress = new 
AMQPFederationAddressPolicyElement();
          receiveFromAddress.setName("address-policy");
-         receiveFromAddress.setMaxHops(1);
+         if (maxHops) {
+            receiveFromAddress.setMaxHops(1);
+         } else {
+            receiveFromAddress.setMaxHops(0); // Disabled
+         }
          receiveFromAddress.addToIncludes("test");
          receiveFromAddress.setAutoDelete(true);
          receiveFromAddress.setAutoDeleteDelay(10_000L);
@@ -430,12 +449,24 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
          server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
 
          final String expectedJMSFilter = generateAddressFilter(1);
+         final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
+         final Symbol noLocalKey = 
Symbol.valueOf("apache.org:no-local-filter:list");
+         final 
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong noLocalCode =
+            
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000003L);
+         final 
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong 
jmsSelectorCode =
+            
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
+
+         final Map<String, Object> selectors = new HashMap<>();
+         selectors.put(AmqpSupport.JMS_SELECTOR_KEY.toString(), new 
AmqpJmsSelectorFilter(expectedJMSFilter));
+         selectors.put(AmqpSupport.NO_LOCAL_NAME.toString(), new 
AmqpNoLocalFilter());
 
          final Map<String, Object> expectedSourceProperties = new HashMap<>();
          expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
          expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
          expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
 
+         final AtomicReference<Attach> capturedAttach = new 
AtomicReference<>();
+
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
          peer.expectAttach().ofReceiver()
                             
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
@@ -444,7 +475,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
                                             containsString("address-receiver"),
                                             
containsString(server.getNodeID().toString())))
                             
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
expectedSourceProperties)
-                            
.withSource().withJMSSelector(expectedJMSFilter).and()
+                            .withCapture(attach -> capturedAttach.set(attach))
                             .respond()
                             
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
          peer.expectFlow().withLinkCredit(1000);
@@ -472,6 +503,28 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
+            assertNotNull(capturedAttach.get());
+
+            final Source remoteSource = capturedAttach.get().getSource();
+            assertNotNull(remoteSource);
+            final Map<Symbol, Object> filtersMap = remoteSource.getFilter();
+            assertNotNull(filtersMap);
+
+            if (maxHops) {
+               assertTrue(filtersMap.containsKey(jmsSelectorKey));
+               final DescribedType jmsSelectorEntry = (DescribedType) 
filtersMap.get(jmsSelectorKey);
+               assertNotNull(jmsSelectorEntry);
+               assertEquals(jmsSelectorEntry.getDescriptor(), jmsSelectorCode);
+               assertEquals(jmsSelectorEntry.getDescribed().toString(), 
expectedJMSFilter);
+            } else {
+               assertFalse(filtersMap.containsKey(jmsSelectorKey));
+            }
+
+            assertTrue(filtersMap.containsKey(noLocalKey));
+            final DescribedType noLocalEntry = (DescribedType) 
filtersMap.get(noLocalKey);
+            assertNotNull(noLocalEntry);
+            assertEquals(noLocalEntry.getDescriptor(), noLocalCode);
+
             // Check that annotation for hops is present in the forwarded 
message.
             final HeaderMatcher headerMatcher = new HeaderMatcher(true);
             final MessageAnnotationsMatcher annotationsMatcher = new 
MessageAnnotationsMatcher(true);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index 3165c149d5..39075bcd5a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -1101,4 +1101,230 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
          assertEquals("green", receivedR.getStringProperty("color"));
       }
    }
+
+   @Test(timeout = 20000)
+   public void 
testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeAMQP() 
throws Exception {
+      
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("AMQP");
+   }
+
+   @Test(timeout = 20000)
+   public void 
testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeCore() 
throws Exception {
+      
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("CORE");
+   }
+
+   private void 
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode(String 
clientProtocol) throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy.setName("local-test-policy");
+      localAddressPolicy.addToIncludes("test");
+      localAddressPolicy.setAutoDelete(false);
+      localAddressPolicy.setAutoDeleteDelay(-1L);
+      localAddressPolicy.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy.setMaxHops(0); // Disable max hops
+
+      final AMQPFederationAddressPolicyElement remoteAddressPolicy = new 
AMQPFederationAddressPolicyElement();
+      remoteAddressPolicy.setName("remote-test-policy");
+      remoteAddressPolicy.addToIncludes("test");
+      remoteAddressPolicy.setAutoDelete(false);
+      remoteAddressPolicy.setAutoDeleteDelay(-1L);
+      remoteAddressPolicy.setAutoDeleteMessageCount(-1L);
+      remoteAddressPolicy.setMaxHops(0); // Disable max hops
+
+      final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addLocalAddressPolicy(localAddressPolicy);
+      element.addRemoteAddressPolicy(remoteAddressPolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection.addElement(element);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + 
SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + 
SERVER_PORT_REMOTE);
+
+      try (Connection connectionL = factoryLocal.createConnection();
+           Connection connectionR = factoryRemote.createConnection()) {
+
+         final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+         final Topic topic = sessionL.createTopic("test");
+
+         final MessageConsumer consumerL = sessionL.createConsumer(topic);
+         final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+         final MessageProducer producerL = sessionL.createProducer(topic);
+         final MessageProducer producerR = sessionR.createProducer(topic);
+
+         final TextMessage messageFromL = sessionL.createTextMessage("local");
+         final TextMessage messageFromR = sessionR.createTextMessage("remote");
+
+         connectionL.start();
+         connectionR.start();
+
+         final SimpleString addressName = SimpleString.toSimpleString("test");
+
+         Wait.assertTrue(() -> server.addressQuery(addressName).isExists());
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(addressName).isExists());
+
+         assertNull(consumerL.receiveNoWait());
+         assertNull(consumerR.receiveNoWait());
+
+         // Captures state of JMS consumer and federation consumer attached on 
each node
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() >= 2);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() >= 2);
+
+         producerL.send(messageFromL);
+
+         final Message messageL1 = consumerL.receive();
+         final Message messageR1 = consumerR.receive();
+
+         assertNotNull(messageL1);
+         assertNotNull(messageR1);
+         assertTrue(messageL1 instanceof TextMessage);
+         assertTrue(messageR1 instanceof TextMessage);
+         assertEquals("local", ((TextMessage) messageL1).getText());
+         assertEquals("local", ((TextMessage) messageR1).getText());
+
+         producerR.send(messageFromR);
+
+         final Message messageL2 = consumerL.receive();
+         final Message messageR2 = consumerR.receive();
+
+         assertNotNull(messageL2);
+         assertNotNull(messageR2);
+         assertTrue(messageL2 instanceof TextMessage);
+         assertTrue(messageR2 instanceof TextMessage);
+         assertEquals("remote", ((TextMessage) messageL2).getText());
+         assertEquals("remote", ((TextMessage) messageR2).getText());
+
+         // Should be no other messages routed
+         assertNull(consumerL.receiveNoWait());
+         assertNull(consumerR.receiveNoWait());
+      }
+   }
+
+   @Test(timeout = 20000)
+   public void 
testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeAMQP() throws 
Exception {
+      
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("AMQP");
+   }
+
+   @Test(timeout = 20000)
+   public void 
testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeCore() throws 
Exception {
+      
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("CORE");
+   }
+
+   private void 
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode(String 
clientProtocol) throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.setMaxHops(0); // Disable max hops
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");
+      localAddressPolicy2.addToIncludes("test");
+      localAddressPolicy2.setAutoDelete(false);
+      localAddressPolicy2.setAutoDeleteDelay(-1L);
+      localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy2.setMaxHops(0); // Disable max hops
+
+      final AMQPFederatedBrokerConnectionElement element1 = new 
AMQPFederatedBrokerConnectionElement();
+      element1.setName(getTestName() + ":1");
+      element1.addLocalAddressPolicy(localAddressPolicy1);
+
+      final AMQPFederatedBrokerConnectionElement element2 = new 
AMQPFederatedBrokerConnectionElement();
+      element2.setName(getTestName() + "2");
+      element2.addLocalAddressPolicy(localAddressPolicy2);
+
+      final AMQPBrokerConnectConfiguration amqpConnection1 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection1.addElement(element1);
+
+      final AMQPBrokerConnectConfiguration amqpConnection2 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT);
+      amqpConnection2.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection2.addElement(element2);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection1);
+      remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + 
SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + 
SERVER_PORT_REMOTE);
+
+      try (Connection connectionL = factoryLocal.createConnection();
+           Connection connectionR = factoryRemote.createConnection()) {
+
+         final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+         final Topic topic = sessionL.createTopic("test");
+
+         final MessageConsumer consumerL = sessionL.createConsumer(topic);
+         final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+         final MessageProducer producerL = sessionL.createProducer(topic);
+         final MessageProducer producerR = sessionR.createProducer(topic);
+
+         final TextMessage messageFromL = sessionL.createTextMessage("local");
+         final TextMessage messageFromR = sessionR.createTextMessage("remote");
+
+         connectionL.start();
+         connectionR.start();
+
+         final SimpleString addressName = SimpleString.toSimpleString("test");
+
+         Wait.assertTrue(() -> server.addressQuery(addressName).isExists());
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(addressName).isExists());
+
+         assertNull(consumerL.receiveNoWait());
+         assertNull(consumerR.receiveNoWait());
+
+         // Captures state of JMS consumer and federation consumer attached on 
each node
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() >= 2);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() >= 2);
+
+         producerL.send(messageFromL);
+
+         final Message messageL1 = consumerL.receive();
+         final Message messageR1 = consumerR.receive();
+
+         assertNotNull(messageL1);
+         assertNotNull(messageR1);
+         assertTrue(messageL1 instanceof TextMessage);
+         assertTrue(messageR1 instanceof TextMessage);
+         assertEquals("local", ((TextMessage) messageL1).getText());
+         assertEquals("local", ((TextMessage) messageR1).getText());
+
+         producerR.send(messageFromR);
+
+         final Message messageL2 = consumerL.receive();
+         final Message messageR2 = consumerR.receive();
+
+         assertNotNull(messageL2);
+         assertNotNull(messageR2);
+         assertTrue(messageL2 instanceof TextMessage);
+         assertTrue(messageR2 instanceof TextMessage);
+         assertEquals("remote", ((TextMessage) messageL2).getText());
+         assertEquals("remote", ((TextMessage) messageR2).getText());
+
+         // Should be no other messages routed
+         assertNull(consumerL.receiveNoWait());
+         assertNull(consumerR.receiveNoWait());
+      }
+   }
 }
+

Reply via email to