gemmellr commented on code in PR #5220: URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1869366885
########## artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java: ########## @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation) return this; } + public boolean getNoForward() { Review Comment: isNoForward would be more typical, and more consistent with the others around it. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -186,6 +191,43 @@ public void testBrokerHandlesSenderLinkOmitsMirrorCapability() throws Exception } } + @Test + @Timeout(20) + public void testBrokerHandlesSenderLinkOmitsNoForwardCapability() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS"); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString(), AMQPMirrorControllerSource.NO_FORWARD.toString()) + .respond() + .withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); Review Comment: Could use the constants for both rather than just one. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java: ########## @@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) throws Exception { return HandleStatus.NO_MATCH; } + if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) { + if (logger.isDebugEnabled()) { + // TODO + } + return HandleStatus.NO_MATCH; Review Comment: Though different than the previous drop-inside-consumer approach, which also never actually sends the message, doing this instead still similarly means nothing evers consume the message. So its still just going to stay there forever (difference this time is, it wont be in a zombie-delivering state). Something will need to be done with the message. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); Review Comment: similarly ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + + String message = "A message!"; + if (longMessage) { + String sLongMessage = ""; + for (int i = 0; i < 11 * 1014; i++) { + sLongMessage += message; + } + message = sLongMessage; + } Review Comment: Probably worth just using a utility method that takes a length and hits it and returns the value. Then call it passing some value based on the configured threshold (which coupled with last comment, could reference a variable to make things clearer later). Even now, the use of 11 * 1014 isnt exactly obvious. Though its only a test, probably nicer to use a StringBuilder too than concat. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport { Review Comment: Most of the tests added in the previous/existing class, would actually be obvious candidates for a class with this name. Perhaps a helpful comment here noting their presence in the other class and to also see it. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); Review Comment: Should put a bound on the reconnects based on some reasonable overall timeout. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + + String message = "A message!"; + if (longMessage) { + String sLongMessage = ""; + for (int i = 0; i < 11 * 1014; i++) { + sLongMessage += message; + } + message = sLongMessage; + } + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage(message)); + producer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + try (Connection conn = factory_2.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage rcvMsg = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message, rcvMsg.getText()); + consumer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q1::getMessageCount, 6000, 100); + + // give some time to peer_3 to receive messages (if any) + Thread.sleep(100); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages are received here, this should error out + + // Then send messages on the broker directly connected to the peer, the messages should make it to the peer. + // Receiving these 3 messages in that order confirms that no previous data reched the Peer, therefore validating + // the test. + peer_3.expectTransfer().accept(); // Address create + peer_3.expectTransfer().accept(); // Queue create + peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); // Producer Message Review Comment: Could also verify the text body of the message to better validate things. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + server_4.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server.locateQueue(getQueueName()); + assertNotNull(q2); + + Queue q3 = server.locateQueue(getQueueName()); + assertNotNull(q3); + + Queue q4 = server.locateQueue(getQueueName()); + assertNotNull(q4); Review Comment: These all use the same queue, as they all call on "server", so the rest of the test isnt validating what it thinks when it checks them all. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + + ActiveMQServer server_2; + ActiveMQServer server_3; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testNoForward() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + + /** + * + * the mirroring topology: + * + * v---------| + * 1 ------> 2 The link between 1 and 2 is noForward=true + * ^ + * v + * 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + // queue creation doesn't reach 3 b/c of the noForward link between 1 and 2. + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) == null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + + // send from 1, 2 receives, 3 don't. + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + + // consume from 2, 1 and 2 counters go back to 0 + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + + Thread.sleep(100); // some time to allow eventual loops + + // queue creation was originated on server, with noForward in place, + // the messages never reached server_3, for the rest of the test suite, + // we need server_3 to have access to the queue + createAddressAndQueues(server_3); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Queue q3 = server_3.locateQueue(getQueueName()); + assertNotNull(q3); + + // produce on 2. 1, 2 and 3 receive messages. + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 1. 1, 2, and 3 counters are back to 0 + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // produce on 2. 1, 2 and 3 receive messages. + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 3. 1, 2 counters are still at 10 and 3 is at 0. + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // consume on 2. 1, 2 and 3 counters are back to 0 + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // produce on 3. only 3 has messages. + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 3. 1, 2, and 3 counters are back to 0 + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } Review Comment: There are a lot of repeated creating new connections to the same server. I'd be inclined to have a larger try-with-resources that created the connections and then [re]used them as needed. Not necessarily for the whole test, but where it makes sense and makes things succinct and more efficient without any notable change in behaviour. E.g the 'consume from 1, from 2, from 3....then do the same again to check there is still nothing' all seems like it would be functionally the same without the extra connection creations ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); Review Comment: It would probably be more readable with connection-specific variable names rather than reusing (here and other test classes). There are 8 different connections, with different configurations on top, all with name amqpConnection. There isnt even a space between the 2 in each block here. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); Review Comment: Could be worth adding a field that can be referenced for later clarity. Though now that I look at the rest of the test, I'd note that this has no effect on Core messages, the Core client decides what is large/not with client-side config, so might be worth configuring the Core clients also. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 Review Comment: This doesnt match what happens below, server 2 has 2 mirrors. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + + String message = "A message!"; + if (longMessage) { + String sLongMessage = ""; + for (int i = 0; i < 11 * 1014; i++) { + sLongMessage += message; + } + message = sLongMessage; + } + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage(message)); + producer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + try (Connection conn = factory_2.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage rcvMsg = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message, rcvMsg.getText()); + consumer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q1::getMessageCount, 6000, 100); Review Comment: Ditto ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + + String message = "A message!"; + if (longMessage) { + String sLongMessage = ""; + for (int i = 0; i < 11 * 1014; i++) { + sLongMessage += message; + } + message = sLongMessage; + } + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage(message)); + producer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); Review Comment: Doesnt seem a need for the fully qualified name, earlier in the method is using just Wait. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); Review Comment: Probably worth renaming the test to make clear it is Core large messages being sent originally. Equally, if controlling the AMQP large message size (which here, will only matter for the non-tunnelled case as mirrored messages arrive at server 2 via server 1) then it probably makes sense to configure the Core client large message size (via its URI, e.g add "?minLargeMessageSize=\<bytes-size\>" Probably worth having tests with large AMQP messages also. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + + String message = "A message!"; + if (longMessage) { + String sLongMessage = ""; + for (int i = 0; i < 11 * 1014; i++) { + sLongMessage += message; + } + message = sLongMessage; + } + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage(message)); + producer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + try (Connection conn = factory_2.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage rcvMsg = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message, rcvMsg.getText()); + consumer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q1::getMessageCount, 6000, 100); + + // give some time to peer_3 to receive messages (if any) + Thread.sleep(100); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages are received here, this should error out + + // Then send messages on the broker directly connected to the peer, the messages should make it to the peer. + // Receiving these 3 messages in that order confirms that no previous data reched the Peer, therefore validating + // the test. + peer_3.expectTransfer().accept(); // Address create + peer_3.expectTransfer().accept(); // Queue create + peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); // Producer Message + + server_2.addAddressInfo(new AddressInfo(SimpleString.of("sometest"), RoutingType.ANYCAST)); + server_2.createQueue(QueueConfiguration.of("sometest").setRoutingType(RoutingType.ANYCAST)); + + try (Connection connection = factory_2.createConnection()) { + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createQueue("sometest")); + final TextMessage msg = session.createTextMessage("test"); + + connection.start(); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); Review Comment: This is the default, can be dropped for conciseness. Dont actually need to start a producer-only connection either. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + + ActiveMQServer server_2; + ActiveMQServer server_3; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testNoForward() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + + /** + * + * the mirroring topology: + * + * v---------| + * 1 ------> 2 The link between 1 and 2 is noForward=true Review Comment: Adding another linebetween these, to allow a more obvious [downward] side here would probably make this clearer. You could also make it wider and embed the (noForward) in the there like the other test class comment. Its also inconsistent with its use of line types denoting the noForward config in the other later test class, would be good to make them the classes illustrate things the same way for clarity. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("amqpMinLargeMessageSize", 100 * 1024); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean longMessage) throws Exception { + final Map<String, Object> brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList<String> capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add("amq.mirror"); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + + String message = "A message!"; + if (longMessage) { + String sLongMessage = ""; + for (int i = 0; i < 11 * 1014; i++) { + sLongMessage += message; + } + message = sLongMessage; + } + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage(message)); + producer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + try (Connection conn = factory_2.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage rcvMsg = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message, rcvMsg.getText()); + consumer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q1::getMessageCount, 6000, 100); + + // give some time to peer_3 to receive messages (if any) + Thread.sleep(100); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages are received here, this should error out + + // Then send messages on the broker directly connected to the peer, the messages should make it to the peer. + // Receiving these 3 messages in that order confirms that no previous data reched the Peer, therefore validating + // the test. + peer_3.expectTransfer().accept(); // Address create + peer_3.expectTransfer().accept(); // Queue create + peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); // Producer Message + + server_2.addAddressInfo(new AddressInfo(SimpleString.of("sometest"), RoutingType.ANYCAST)); + server_2.createQueue(QueueConfiguration.of("sometest").setRoutingType(RoutingType.ANYCAST)); Review Comment: could probably use something more descriptive than "sometest". Either way, a variable would probably make it easier to follow where it is used, e.g in the mirror config for filtering, and again here/below. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); Review Comment: Similarly, should gate retries. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + + ActiveMQServer server_2; + ActiveMQServer server_3; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testNoForward() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + + /** + * + * the mirroring topology: + * + * v---------| + * 1 ------> 2 The link between 1 and 2 is noForward=true + * ^ + * v + * 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); Review Comment: Should add a gate on the retries (here and below). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact