This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fd5d9b9ad0 ARTEMIS-4658 Prevent reflections when using dual address
federation
fd5d9b9ad0 is described below
commit fd5d9b9ad03e15d8b90b4861358ab55c67207525
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Feb 28 11:03:59 2024 -0500
ARTEMIS-4658 Prevent reflections when using dual address federation
When federation is configured in two directions between nodes for an address
the message can reflect from one node to another if max hops is not set or
not
set correctly and in some federation topologies the max hops value can't
solve
the issue and still result in a working configuration. This reflection
should
be prevented at the federation consumer level for address consumers.
---
.../amqp/broker/ProtonProtocolManager.java | 2 +-
.../federation/AMQPFederationAddressConsumer.java | 23 ++-
.../AMQPFederationAddressSenderController.java | 44 +++-
.../federation/AMQPFederationPolicySupport.java | 8 +-
.../amqp/AmqpInboundConnectionTest.java | 4 +-
.../connect/AMQPFederationAddressPolicyTest.java | 57 +++++-
.../connect/AMQPFederationServerToServerTest.java | 226 +++++++++++++++++++++
7 files changed, 339 insertions(+), 25 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 07bb6ed407..6f246531e1 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -238,7 +238,7 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
ttl = 0;
}
- String id = server.getConfiguration().getName();
+ String id = server.getNodeID().toString();
boolean useCoreSubscriptionNaming =
server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this,
connectionCallback, id, (int) ttl, getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), true, saslFactory, null, outgoing);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index dda6e95cd3..777452c708 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -62,11 +62,13 @@ import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
+import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -274,19 +276,21 @@ public class AMQPFederationAddressConsumer implements
FederationConsumerInternal
source.setCapabilities(AmqpSupport.TOPIC_CAPABILITY);
}
- source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES,
DEFAULT_OUTCOMES.length));
- source.setDurable(TerminusDurability.NONE);
- source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- source.setAddress(address);
+ final Map<Symbol, Object> filtersMap = new HashMap<>();
+ filtersMap.put(AmqpSupport.NO_LOCAL_NAME,
AmqpNoLocalFilter.NO_LOCAL);
if (consumerInfo.getFilterString() != null &&
!consumerInfo.getFilterString().isEmpty()) {
final AmqpJmsSelectorFilter jmsFilter = new
AmqpJmsSelectorFilter(consumerInfo.getFilterString());
- final Map<Symbol, Object> filtersMap = new HashMap<>();
- filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
- source.setFilter(filtersMap);
+ filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
}
+ source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES,
DEFAULT_OUTCOMES.length));
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setAddress(address);
+ source.setFilter(filtersMap);
+
target.setAddress(address);
final Map<String, Object> addressSourceProperties = new
HashMap<>();
@@ -526,6 +530,11 @@ public class AMQPFederationAddressConsumer implements
FederationConsumerInternal
if (message instanceof ICoreMessage) {
baseMessage = incrementCoreMessageHops((ICoreMessage) message);
+
+ // Add / Update the connection Id value to reflect the remote
container Id so that the
+ // no-local filter of a federation address receiver directed
back to the source of this
+ // message will exclude it as intended.
+
baseMessage.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING,
getConnection().getRemoteContainer());
} else {
baseMessage = incrementAMQPMessageHops((AMQPMessage) message);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
index 2ac48d55a4..56759303f6 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
@@ -48,6 +48,7 @@ import
org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
+import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
@@ -117,19 +118,22 @@ public final class AMQPFederationAddressSenderController
extends AMQPFederationB
final long autoDeleteDelay = ((Number)
addressSourceProperties.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0)).longValue();
final long autoDeleteMsgCount = ((Number)
addressSourceProperties.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT,
0)).longValue();
- // An address receiver may opt to filter on things like max message hops
so we must
- // check for a filter here and apply it if it exists.
- final Map.Entry<Symbol, DescribedType> filter =
AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
+ // An address receiver may opt to filter on things like max message hops
or no local message
+ // reflection so we must check for a filter here and apply it if it
exists.
+ final String jmsSelector = getJMSSelectorFromFilters(source);
+ final Map.Entry<Symbol, DescribedType> noLocal =
AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
- if (filter != null) {
- selector = filter.getValue().getDescribed().toString();
- try {
- SelectorParser.parse(selector);
- } catch (FilterException e) {
- throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid
filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
+ if (noLocal != null) {
+ String remoteContainerId = protonConnection.getRemoteContainer();
+ String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING
+ "<>'" + remoteContainerId + "'";
+
+ if (jmsSelector == null) {
+ selector = noLocalFilter;
+ } else {
+ selector = jmsSelector + " AND " + noLocalFilter;
}
} else {
- selector = null;
+ selector = jmsSelector;
}
final SimpleString address =
SimpleString.toSimpleString(source.getAddress());
@@ -195,6 +199,26 @@ public final class AMQPFederationAddressSenderController
extends AMQPFederationB
return (Consumer) sessionSPI.createSender(senderContext, queueName,
null, false);
}
+ @SuppressWarnings("unchecked")
+ private String getJMSSelectorFromFilters(Source source) throws
ActiveMQAMQPException {
+ final Map.Entry<Symbol, DescribedType> jmsSelector =
AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
+
+ String selectorString = null;
+
+ // Validate the JMS selector if present.
+ if (jmsSelector != null) {
+ selectorString = jmsSelector.getValue().getDescribed().toString();
+
+ try {
+ SelectorParser.parse(selectorString);
+ } catch (FilterException e) {
+ throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid
filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
+ }
+ }
+
+ return selectorString;
+ }
+
private static RoutingType getRoutingType(Source source) {
if (source != null) {
if (source.getCapabilities() != null) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
index 6a6a1b709b..1e47d993b9 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
@@ -105,17 +105,17 @@ public final class AMQPFederationPolicySupport {
* to indicate no max hops for federated messages on an address.
*
* @param maxHops
- * The max allowed number of hops before a message should stop cross
federation links.
+ * The max allowed number of hops before a message should stop crossing
federation links.
*
- * @return the address filter string or null if not needed.
+ * @return the address filter string that should be applied (or null).
*/
public static String generateAddressFilter(int maxHops) {
if (maxHops <= 0) {
return null;
}
- return "(\"m." + MESSAGE_HOPS_ANNOTATION.toString() +
- "\" IS NULL OR \"m." + MESSAGE_HOPS_ANNOTATION.toString() +
+ return "(\"m." + MESSAGE_HOPS_ANNOTATION +
+ "\" IS NULL OR \"m." + MESSAGE_HOPS_ANNOTATION +
"\"<" + maxHops + ")" +
" AND " +
"(" + MESSAGE_HOPS_PROPERTY + " IS NULL OR " +
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
index 02ff735503..1788ce4bca 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
@@ -75,6 +75,8 @@ public class AmqpInboundConnectionTest extends
AmqpClientTestSupport {
@Test(timeout = 60000)
public void testBrokerContainerId() throws Exception {
+ final String containerId = server.getNodeID().toString();
+
AmqpClient client = createAmqpClient();
assertNotNull(client);
@@ -82,7 +84,7 @@ public class AmqpInboundConnectionTest extends
AmqpClientTestSupport {
@Override
public void inspectOpenedResource(Connection connection) {
- if (!BROKER_NAME.equals(connection.getRemoteContainer())) {
+ if (!containerId.equals(connection.getRemoteContainer())) {
markAsInvalid("Broker did not send the expected container ID");
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 21c4b55b12..4f9dfe18f1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -100,6 +100,8 @@ import
org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -109,6 +111,10 @@ import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestPeer;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.test.driver.codec.messaging.Source;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
+import org.apache.qpid.protonj2.test.driver.codec.transport.Attach;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher;
import
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
import
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
@@ -388,6 +394,15 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
@Test(timeout = 20000)
public void
testFederationCreatesAddressReceiverLinkForAddressMatchWithMaxHopsFilter()
throws Exception {
+
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(true);
+ }
+
+ @Test(timeout = 20000)
+ public void
testFederationCreatesAddressReceiverLinkForAddressMatchWithoutMaxHopsFilter()
throws Exception {
+
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(false);
+ }
+
+ private void
doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(boolean
maxHops) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
@@ -410,7 +425,11 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
- receiveFromAddress.setMaxHops(1);
+ if (maxHops) {
+ receiveFromAddress.setMaxHops(1);
+ } else {
+ receiveFromAddress.setMaxHops(0); // Disabled
+ }
receiveFromAddress.addToIncludes("test");
receiveFromAddress.setAutoDelete(true);
receiveFromAddress.setAutoDeleteDelay(10_000L);
@@ -430,12 +449,24 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
final String expectedJMSFilter = generateAddressFilter(1);
+ final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
+ final Symbol noLocalKey =
Symbol.valueOf("apache.org:no-local-filter:list");
+ final
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong noLocalCode =
+
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000003L);
+ final
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong
jmsSelectorCode =
+
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
+
+ final Map<String, Object> selectors = new HashMap<>();
+ selectors.put(AmqpSupport.JMS_SELECTOR_KEY.toString(), new
AmqpJmsSelectorFilter(expectedJMSFilter));
+ selectors.put(AmqpSupport.NO_LOCAL_NAME.toString(), new
AmqpNoLocalFilter());
final Map<String, Object> expectedSourceProperties = new HashMap<>();
expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+ final AtomicReference<Attach> capturedAttach = new
AtomicReference<>();
+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
@@ -444,7 +475,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
-
.withSource().withJMSSelector(expectedJMSFilter).and()
+ .withCapture(attach -> capturedAttach.set(attach))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
@@ -472,6 +503,28 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ assertNotNull(capturedAttach.get());
+
+ final Source remoteSource = capturedAttach.get().getSource();
+ assertNotNull(remoteSource);
+ final Map<Symbol, Object> filtersMap = remoteSource.getFilter();
+ assertNotNull(filtersMap);
+
+ if (maxHops) {
+ assertTrue(filtersMap.containsKey(jmsSelectorKey));
+ final DescribedType jmsSelectorEntry = (DescribedType)
filtersMap.get(jmsSelectorKey);
+ assertNotNull(jmsSelectorEntry);
+ assertEquals(jmsSelectorEntry.getDescriptor(), jmsSelectorCode);
+ assertEquals(jmsSelectorEntry.getDescribed().toString(),
expectedJMSFilter);
+ } else {
+ assertFalse(filtersMap.containsKey(jmsSelectorKey));
+ }
+
+ assertTrue(filtersMap.containsKey(noLocalKey));
+ final DescribedType noLocalEntry = (DescribedType)
filtersMap.get(noLocalKey);
+ assertNotNull(noLocalEntry);
+ assertEquals(noLocalEntry.getDescriptor(), noLocalCode);
+
// Check that annotation for hops is present in the forwarded
message.
final HeaderMatcher headerMatcher = new HeaderMatcher(true);
final MessageAnnotationsMatcher annotationsMatcher = new
MessageAnnotationsMatcher(true);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index 3165c149d5..39075bcd5a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -1101,4 +1101,230 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
assertEquals("green", receivedR.getStringProperty("color"));
}
}
+
+ @Test(timeout = 20000)
+ public void
testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeAMQP()
throws Exception {
+
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("AMQP");
+ }
+
+ @Test(timeout = 20000)
+ public void
testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeCore()
throws Exception {
+
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("CORE");
+ }
+
+ private void
doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode(String
clientProtocol) throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy.setName("local-test-policy");
+ localAddressPolicy.addToIncludes("test");
+ localAddressPolicy.setAutoDelete(false);
+ localAddressPolicy.setAutoDeleteDelay(-1L);
+ localAddressPolicy.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy.setMaxHops(0); // Disable max hops
+
+ final AMQPFederationAddressPolicyElement remoteAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ remoteAddressPolicy.setName("remote-test-policy");
+ remoteAddressPolicy.addToIncludes("test");
+ remoteAddressPolicy.setAutoDelete(false);
+ remoteAddressPolicy.setAutoDeleteDelay(-1L);
+ remoteAddressPolicy.setAutoDeleteMessageCount(-1L);
+ remoteAddressPolicy.setMaxHops(0); // Disable max hops
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(localAddressPolicy);
+ element.addRemoteAddressPolicy(remoteAddressPolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" +
SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" +
SERVER_PORT_REMOTE);
+
+ try (Connection connectionL = factoryLocal.createConnection();
+ Connection connectionR = factoryRemote.createConnection()) {
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionL.createTopic("test");
+
+ final MessageConsumer consumerL = sessionL.createConsumer(topic);
+ final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+ final MessageProducer producerL = sessionL.createProducer(topic);
+ final MessageProducer producerR = sessionR.createProducer(topic);
+
+ final TextMessage messageFromL = sessionL.createTextMessage("local");
+ final TextMessage messageFromR = sessionR.createTextMessage("remote");
+
+ connectionL.start();
+ connectionR.start();
+
+ final SimpleString addressName = SimpleString.toSimpleString("test");
+
+ Wait.assertTrue(() -> server.addressQuery(addressName).isExists());
+ Wait.assertTrue(() ->
remoteServer.addressQuery(addressName).isExists());
+
+ assertNull(consumerL.receiveNoWait());
+ assertNull(consumerR.receiveNoWait());
+
+ // Captures state of JMS consumer and federation consumer attached on
each node
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() >= 2);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() >= 2);
+
+ producerL.send(messageFromL);
+
+ final Message messageL1 = consumerL.receive();
+ final Message messageR1 = consumerR.receive();
+
+ assertNotNull(messageL1);
+ assertNotNull(messageR1);
+ assertTrue(messageL1 instanceof TextMessage);
+ assertTrue(messageR1 instanceof TextMessage);
+ assertEquals("local", ((TextMessage) messageL1).getText());
+ assertEquals("local", ((TextMessage) messageR1).getText());
+
+ producerR.send(messageFromR);
+
+ final Message messageL2 = consumerL.receive();
+ final Message messageR2 = consumerR.receive();
+
+ assertNotNull(messageL2);
+ assertNotNull(messageR2);
+ assertTrue(messageL2 instanceof TextMessage);
+ assertTrue(messageR2 instanceof TextMessage);
+ assertEquals("remote", ((TextMessage) messageL2).getText());
+ assertEquals("remote", ((TextMessage) messageR2).getText());
+
+ // Should be no other messages routed
+ assertNull(consumerL.receiveNoWait());
+ assertNull(consumerR.receiveNoWait());
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeAMQP() throws
Exception {
+
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("AMQP");
+ }
+
+ @Test(timeout = 20000)
+ public void
testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeCore() throws
Exception {
+
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("CORE");
+ }
+
+ private void
doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode(String
clientProtocol) throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.setMaxHops(0); // Disable max hops
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
+ localAddressPolicy2.addToIncludes("test");
+ localAddressPolicy2.setAutoDelete(false);
+ localAddressPolicy2.setAutoDeleteDelay(-1L);
+ localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy2.setMaxHops(0); // Disable max hops
+
+ final AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ element1.setName(getTestName() + ":1");
+ element1.addLocalAddressPolicy(localAddressPolicy1);
+
+ final AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+ element2.setName(getTestName() + "2");
+ element2.addLocalAddressPolicy(localAddressPolicy2);
+
+ final AMQPBrokerConnectConfiguration amqpConnection1 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection1.addElement(element1);
+
+ final AMQPBrokerConnectConfiguration amqpConnection2 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT);
+ amqpConnection2.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection2.addElement(element2);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection1);
+ remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" +
SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" +
SERVER_PORT_REMOTE);
+
+ try (Connection connectionL = factoryLocal.createConnection();
+ Connection connectionR = factoryRemote.createConnection()) {
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionL.createTopic("test");
+
+ final MessageConsumer consumerL = sessionL.createConsumer(topic);
+ final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+ final MessageProducer producerL = sessionL.createProducer(topic);
+ final MessageProducer producerR = sessionR.createProducer(topic);
+
+ final TextMessage messageFromL = sessionL.createTextMessage("local");
+ final TextMessage messageFromR = sessionR.createTextMessage("remote");
+
+ connectionL.start();
+ connectionR.start();
+
+ final SimpleString addressName = SimpleString.toSimpleString("test");
+
+ Wait.assertTrue(() -> server.addressQuery(addressName).isExists());
+ Wait.assertTrue(() ->
remoteServer.addressQuery(addressName).isExists());
+
+ assertNull(consumerL.receiveNoWait());
+ assertNull(consumerR.receiveNoWait());
+
+ // Captures state of JMS consumer and federation consumer attached on
each node
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() >= 2);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() >= 2);
+
+ producerL.send(messageFromL);
+
+ final Message messageL1 = consumerL.receive();
+ final Message messageR1 = consumerR.receive();
+
+ assertNotNull(messageL1);
+ assertNotNull(messageR1);
+ assertTrue(messageL1 instanceof TextMessage);
+ assertTrue(messageR1 instanceof TextMessage);
+ assertEquals("local", ((TextMessage) messageL1).getText());
+ assertEquals("local", ((TextMessage) messageR1).getText());
+
+ producerR.send(messageFromR);
+
+ final Message messageL2 = consumerL.receive();
+ final Message messageR2 = consumerR.receive();
+
+ assertNotNull(messageL2);
+ assertNotNull(messageR2);
+ assertTrue(messageL2 instanceof TextMessage);
+ assertTrue(messageR2 instanceof TextMessage);
+ assertEquals("remote", ((TextMessage) messageL2).getText());
+ assertEquals("remote", ((TextMessage) messageR2).getText());
+
+ // Should be no other messages routed
+ assertNull(consumerL.receiveNoWait());
+ assertNull(consumerR.receiveNoWait());
+ }
+ }
}
+