[
https://issues.apache.org/jira/browse/ARTEMIS-3759?focusedWorklogId=763572&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763572
]
ASF GitHub Bot logged work on ARTEMIS-3759:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Apr/22 15:07
Start Date: 28/Apr/22 15:07
Worklog Time Spent: 10m
Work Description: iliya-gr commented on code in PR #4054:
URL: https://github.com/apache/activemq-artemis/pull/4054#discussion_r861006446
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java:
##########
@@ -473,6 +473,73 @@ public void testNoAddressWithAnnotations() throws
Exception {
}
}
+ @Test
+ public void testAddressFilter() throws Exception {
+ server.start();
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("server_2");
+ server_2.getConfiguration().setName("server_2");
+
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("mirror-source", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPMirrorBrokerConnectionElement replica = new
AMQPMirrorBrokerConnectionElement().setDurable(true).setAddressFilter("replicated,!nonReplicated");
+ amqpConnection.addElement(replica);
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server_2.start();
+
+ {
+ // Send to nonReplicated address
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT_2);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue("nonReplicated"));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(session.createTextMessage("never receive"));
+ connection.close();
+ }
+
+ // Check nothing was added to SnF queue
+ Assert.assertEquals(0,
server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded());
+
+ {
+ // Send to replicated address
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT_2);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue("replicated"));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(session.createTextMessage("will receive"));
+ connection.close();
+ }
+
+ // Check some messages were sent to SnF queue
+
Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded()
> 0);
+
+ {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("replicated"));
+ Message message = consumer.receive(3000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("will receive", message.getBody(String.class));
+ connection.close();
+ }
+
+ {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("replicated"));
Review Comment:
Indeed, it should be `nonReplicated`. I have added double send to prevent
such mistype.
Issue Time Tracking
-------------------
Worklog Id: (was: 763572)
Time Spent: 1h 50m (was: 1h 40m)
> Allow for Mirroring (Broker Connections) to specify a specific set of
> addresses to send events, as is done for a cluster connection
> -------------------------------------------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-3759
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3759
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: AMQP
> Affects Versions: 2.19.1, 2.21.0
> Reporter: Mikhail Lukyanov
> Priority: Major
> Attachments: ImageAddressSyntax.png, ImageInternalQueues.png
>
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> If a target broker of mirroring is in a cluster, then mirroring of the
> broker's internal queues also occurs, and often messages accumulate in such
> queues. In theory, internal cluster queues should not be mirrored, this does
> not make much sense.
> Therefore, it would be convenient to allow you to configure for which
> addresses and their queues mirroring will be performed, events will be sent
> (message-acknowledgements, queue-removal, queue-creation). The syntax that is
> used to specify the *_addresses_* of the *_cluster connection_* is well
> suited for this.
> Mirrored internal cluster queues
> !ImageInternalQueues.png!
> Address syntax
> !ImageAddressSyntax.png!
--
This message was sent by Atlassian Jira
(v8.20.7#820007)