[ https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=504756&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-504756 ]
ASF GitHub Bot logged work on ARTEMIS-2937: ------------------------------------------- Author: ASF GitHub Bot Created on: 26/Oct/20 15:06 Start Date: 26/Oct/20 15:06 Worklog Time Spent: 10m Work Description: clebertsuconic commented on a change in pull request #3294: URL: https://github.com/apache/activemq-artemis/pull/3294#discussion_r512033104 ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = locateQueue(server_2, "TEST"); + final Queue queueViewReplica = locateQueue(server_2, "TEST"); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(true, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount); + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(true, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); + + connection.close(); + } + } + + + @Test + public void testReplicaNoAddressOnMessage() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); Review comment: ok, I will do it.. it's a common practice on the entire test suite but I will do it anyway. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 504756) Time Spent: 25.5h (was: 25h 20m) > AMQP Server Connectivity > ------------------------ > > Key: ARTEMIS-2937 > URL: https://issues.apache.org/jira/browse/ARTEMIS-2937 > Project: ActiveMQ Artemis > Issue Type: New Feature > Components: AMQP > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.16.0 > > Time Spent: 25.5h > Remaining Estimate: 0h > > This feature adds server side connectivity. > > It is possible to link two brokers directly using AMQP with this feature, and > have a Queue transferring messages to another broker directly. > > For this we would have options called <sender and <receiver > > > it would also be possible to use qpid-dispatch as an intermediary between > clients and the brokers (or eventually between brokers), on that case the > option will be <peer > > it would also be possible to use <mirror with a few option to replicate data > between two brokers, bringing the possibility of using it for Disaster & > Recovery and Failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)