[ 
https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=504750&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-504750
 ]

ASF GitHub Bot logged work on ARTEMIS-2937:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Oct/20 14:53
            Start Date: 26/Oct/20 14:53
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3294:
URL: https://github.com/apache/activemq-artemis/pull/3294#discussion_r512022622



##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testReplicaNoAddressOnMessage() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(false, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(false, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());

Review comment:
       It might fail now and again since its a race, yes (I doubt 1 in 10 since 
its racing a local check against remote action of new credit and the broker 
compling delivery of a message to the client before it polls its 
buffer)....which history would show people will tend to ignore if it passes the 
next time.
   
   If its aiming to verify there are no more messages a better way would seem 
to be to grant more than NUMBER_OF_MESSAGES of credit at the start and not 
bother granting more at the end, so they get prefetched while the others do.
   
   I personally dont see a point in verifying in this test that there arent 
more messages. Its not like its testing selection or anything.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 504750)
    Time Spent: 25h  (was: 24h 50m)

> AMQP Server Connectivity
> ------------------------
>
>                 Key: ARTEMIS-2937
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2937
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>          Components: AMQP
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.16.0
>
>          Time Spent: 25h
>  Remaining Estimate: 0h
>
> This feature adds server side connectivity.
>  
> It is possible to link two brokers directly using AMQP with this feature, and 
> have a Queue transferring messages to another broker directly. 
>  
> For this we would have options called <sender and <receiver
>  
>  
> it would also be possible to use qpid-dispatch as an intermediary between 
> clients and the brokers (or eventually between brokers), on that case the 
> option will be <peer
>  
> it would also be possible to use <mirror with a few option to replicate data 
> between two brokers, bringing the possibility of using it for Disaster & 
> Recovery and Failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to