[ 
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=625328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-625328
 ]

ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jul/21 11:54
            Start Date: 20/Jul/21 11:54
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r672339680



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -66,6 +66,7 @@
    // Delivery annotation property used on mirror control routing and Ack
    public static final Symbol INTERNAL_ID = 
Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = 
Symbol.getSymbol("x-opt-amq-mr-dst");
+   public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq-mr");

Review comment:
       Capabilities tend to use more obvious full word values since they arent 
being so frequently sent.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws 
Exception {
    }
 
    private boolean isReplicaTarget(Link link) {
-      return link != null && link.getTarget() != null && 
link.getTarget().getAddress() != null && 
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+      boolean hasMirror = false;
+
+      Terminus terminus = (Terminus)link.getTarget();

Review comment:
       This should be looking at the 'remote target' rather than the local 
target. I see the calling method explicitly sets the local target to be the 
same object as the remote target, which is why this currently 'works'. 
   
   It shouldnt be doing that. That is the precise type of thing that leads to 
servers ending up lying about what they are really doing as I noted in chat 
recently, which breaks the ability to use terminus capabilities to detect 
support, as whatever you send just gets reflected back at you. If old brokers 
do the same as this (and I expect they do as you didnt touch those lines 
recently that I can recall) then the new broker cant use presence of this 
terminus capability to detect if its talking to a new broker with the updated 
mirror behaviour at the other end, as the old brokers will just blindly reflect 
it back at them.
   
   New brokers can still use its presence on incoming links to see that this is 
trying to be a mirror link, as this is doing, but not the reverse.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
##########
@@ -256,21 +262,46 @@ protected void configureAddressPolicy(ActiveMQServer 
server) {
 
    protected void createAddressAndQueues(ActiveMQServer server) throws 
Exception {
       // Default Queue
-      server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
-      server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+      try {
+         server.addAddressInfo(new 
AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
+      } catch (Throwable ignored) {
+      }

Review comment:
       Why is this needed? Swallowing test setup exceptions generally just to 
cater for a specific test seems like a bad idea. 
   
   If its something like 'address already exists' the test should likely be 
overriding the method and changing it to accommodate its test-specific needs 
(e.g get and clear a previously-set reference to some test-specific / 
startup-specific behaviour to execute).

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
##########
@@ -45,23 +43,25 @@ public void durableMessageDataNotScannedOnRestartTest() 
throws Exception {
       server.stop();
       server.start();
 
-      final AMQPMessage amqpMessage;
-
       final Queue afterRestartQueueView = getProxyToQueue(getQueueName());
 
       Wait.assertTrue("All messages should arrive", () -> 
afterRestartQueueView.getMessageCount() == 1);
 
-      try (LinkedListIterator<MessageReference> iterator = 
afterRestartQueueView.iterator()) {
-         Assert.assertTrue(iterator.hasNext());
-         final MessageReference next = iterator.next();
-         Assert.assertNotNull(next);
-         Assert.assertFalse(iterator.hasNext());
-         final Message message = next.getMessage();
-         Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
-         amqpMessage = (AMQPMessage) message;
-         
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, 
amqpMessage.getDataScanningStatus());
-         Assert.assertTrue(amqpMessage.isDurable());
-      }
+      final AtomicInteger foreachCount = new AtomicInteger(0);
+
+      ArrayList<AMQPMessage> messageReference = new ArrayList<>(1);
+
+      afterRestartQueueView.forEach((next) -> {
+         final AMQPMessage message = (AMQPMessage)next.getMessage();
+         
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, 
message.getDataScanningStatus());
+         Assert.assertTrue(message.isDurable());
+         // Doing the check again in case isDurable messed up the scanning 
status. It should not change the status by definition
+         
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, 
message.getDataScanningStatus());
+         messageReference.add(message);
+         foreachCount.incrementAndGet();

Review comment:
       I'm guessing there was originally meant to be an assert on this value 
thats now being covered by the messageReference list size? Its effectively 
unused and can just be removed as is.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws 
Exception {
    }
 
    private boolean isReplicaTarget(Link link) {
-      return link != null && link.getTarget() != null && 
link.getTarget().getAddress() != null && 
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+      boolean hasMirror = false;
+
+      Terminus terminus = (Terminus)link.getTarget();
+      if (terminus != null && terminus.getCapabilities() != null) {
+         for (Symbol s : terminus.getCapabilities()) {
+            if (s.equals(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
+               hasMirror = true;
+               break;
+            }
+         }
+      }

Review comment:
       This now ignores the address when determining if its a mirroring link, 
other than later requiring there is some address specified. I guess this is to 
allow for differing values for e.g doing 'dual mirrors' link routing both ways 
through a router network eventually?
   
   You would still need to use different addresses for each broker to attach to 
to facilitate that. You could also just say something like the address has to 
be the exact "$ACTIVEMQ_ARTEMIS_MIRROR" address (the default), or else have 
that as a prefix of the full address used. That way folks could still tweak the 
address attached to by 1 (or both if desired) source broker, by 
configuring/adding a suffix, still without needing to change the destination 
broker config to accommodate it, while keeping all the mirroring related stuff 
operating in a particular namespace and still easily identifiable. You wouldn't 
need the terminus capability then either (as above, its only going to be half 
useful by looks of things anyway).

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws 
Exception {
    }
 
    private boolean isReplicaTarget(Link link) {
-      return link != null && link.getTarget() != null && 
link.getTarget().getAddress() != null && 
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+      boolean hasMirror = false;
+
+      Terminus terminus = (Terminus)link.getTarget();
+      if (terminus != null && terminus.getCapabilities() != null) {
+         for (Symbol s : terminus.getCapabilities()) {
+            if (s.equals(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
+               hasMirror = true;
+               break;
+            }
+         }
+      }
+
+      return link != null && link.getTarget() != null && 
link.getTarget().getAddress() != null && hasMirror;

Review comment:
       Superfluous null check, as it would have NPE'd at the top of the method 
otherwise. The terminus also has a variable that could be used.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
##########
@@ -45,23 +43,25 @@ public void durableMessageDataNotScannedOnRestartTest() 
throws Exception {
       server.stop();
       server.start();
 
-      final AMQPMessage amqpMessage;
-
       final Queue afterRestartQueueView = getProxyToQueue(getQueueName());
 
       Wait.assertTrue("All messages should arrive", () -> 
afterRestartQueueView.getMessageCount() == 1);
 
-      try (LinkedListIterator<MessageReference> iterator = 
afterRestartQueueView.iterator()) {
-         Assert.assertTrue(iterator.hasNext());
-         final MessageReference next = iterator.next();
-         Assert.assertNotNull(next);
-         Assert.assertFalse(iterator.hasNext());
-         final Message message = next.getMessage();
-         Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
-         amqpMessage = (AMQPMessage) message;
-         
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, 
amqpMessage.getDataScanningStatus());
-         Assert.assertTrue(amqpMessage.isDurable());
-      }
+      final AtomicInteger foreachCount = new AtomicInteger(0);
+
+      ArrayList<AMQPMessage> messageReference = new ArrayList<>(1);
+
+      afterRestartQueueView.forEach((next) -> {

Review comment:
       The commit message seems confusing, it says "Changing 
AmqpJournalLoadingTest to use queue.iterator after the method addition" yet 
this appears to stop using the iterator method and start using the forEach 
method"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 625328)
    Time Spent: 23h 10m  (was: 23h)

> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
>                 Key: ARTEMIS-3243
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.17.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.18.0
>
>          Time Spent: 23h 10m
>  Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each 
> other, each one having its own ID, and each one would send updates to the 
> other broker.
> The outcome is that if you just transferred producers and consumers from one 
> broker into the other, the fallback would be automatic and simple. No need to 
> disable and enable mirror options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to