[
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=625328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-625328
]
ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jul/21 11:54
Start Date: 20/Jul/21 11:54
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r672339680
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -66,6 +66,7 @@
// Delivery annotation property used on mirror control routing and Ack
public static final Symbol INTERNAL_ID =
Symbol.getSymbol("x-opt-amq-mr-id");
public static final Symbol INTERNAL_DESTINATION =
Symbol.getSymbol("x-opt-amq-mr-dst");
+ public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq-mr");
Review comment:
Capabilities tend to use more obvious full word values since they arent
being so frequently sent.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws
Exception {
}
private boolean isReplicaTarget(Link link) {
- return link != null && link.getTarget() != null &&
link.getTarget().getAddress() != null &&
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+ boolean hasMirror = false;
+
+ Terminus terminus = (Terminus)link.getTarget();
Review comment:
This should be looking at the 'remote target' rather than the local
target. I see the calling method explicitly sets the local target to be the
same object as the remote target, which is why this currently 'works'.
It shouldnt be doing that. That is the precise type of thing that leads to
servers ending up lying about what they are really doing as I noted in chat
recently, which breaks the ability to use terminus capabilities to detect
support, as whatever you send just gets reflected back at you. If old brokers
do the same as this (and I expect they do as you didnt touch those lines
recently that I can recall) then the new broker cant use presence of this
terminus capability to detect if its talking to a new broker with the updated
mirror behaviour at the other end, as the old brokers will just blindly reflect
it back at them.
New brokers can still use its presence on incoming links to see that this is
trying to be a mirror link, as this is doing, but not the reverse.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
##########
@@ -256,21 +262,46 @@ protected void configureAddressPolicy(ActiveMQServer
server) {
protected void createAddressAndQueues(ActiveMQServer server) throws
Exception {
// Default Queue
- server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
- server.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+ try {
+ server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
+ } catch (Throwable ignored) {
+ }
Review comment:
Why is this needed? Swallowing test setup exceptions generally just to
cater for a specific test seems like a bad idea.
If its something like 'address already exists' the test should likely be
overriding the method and changing it to accommodate its test-specific needs
(e.g get and clear a previously-set reference to some test-specific /
startup-specific behaviour to execute).
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
##########
@@ -45,23 +43,25 @@ public void durableMessageDataNotScannedOnRestartTest()
throws Exception {
server.stop();
server.start();
- final AMQPMessage amqpMessage;
-
final Queue afterRestartQueueView = getProxyToQueue(getQueueName());
Wait.assertTrue("All messages should arrive", () ->
afterRestartQueueView.getMessageCount() == 1);
- try (LinkedListIterator<MessageReference> iterator =
afterRestartQueueView.iterator()) {
- Assert.assertTrue(iterator.hasNext());
- final MessageReference next = iterator.next();
- Assert.assertNotNull(next);
- Assert.assertFalse(iterator.hasNext());
- final Message message = next.getMessage();
- Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
- amqpMessage = (AMQPMessage) message;
-
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE,
amqpMessage.getDataScanningStatus());
- Assert.assertTrue(amqpMessage.isDurable());
- }
+ final AtomicInteger foreachCount = new AtomicInteger(0);
+
+ ArrayList<AMQPMessage> messageReference = new ArrayList<>(1);
+
+ afterRestartQueueView.forEach((next) -> {
+ final AMQPMessage message = (AMQPMessage)next.getMessage();
+
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE,
message.getDataScanningStatus());
+ Assert.assertTrue(message.isDurable());
+ // Doing the check again in case isDurable messed up the scanning
status. It should not change the status by definition
+
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE,
message.getDataScanningStatus());
+ messageReference.add(message);
+ foreachCount.incrementAndGet();
Review comment:
I'm guessing there was originally meant to be an assert on this value
thats now being covered by the messageReference list size? Its effectively
unused and can just be removed as is.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws
Exception {
}
private boolean isReplicaTarget(Link link) {
- return link != null && link.getTarget() != null &&
link.getTarget().getAddress() != null &&
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+ boolean hasMirror = false;
+
+ Terminus terminus = (Terminus)link.getTarget();
+ if (terminus != null && terminus.getCapabilities() != null) {
+ for (Symbol s : terminus.getCapabilities()) {
+ if (s.equals(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
+ hasMirror = true;
+ break;
+ }
+ }
+ }
Review comment:
This now ignores the address when determining if its a mirroring link,
other than later requiring there is some address specified. I guess this is to
allow for differing values for e.g doing 'dual mirrors' link routing both ways
through a router network eventually?
You would still need to use different addresses for each broker to attach to
to facilitate that. You could also just say something like the address has to
be the exact "$ACTIVEMQ_ARTEMIS_MIRROR" address (the default), or else have
that as a prefix of the full address used. That way folks could still tweak the
address attached to by 1 (or both if desired) source broker, by
configuring/adding a suffix, still without needing to change the destination
broker config to accommodate it, while keeping all the mirroring related stuff
operating in a particular namespace and still easily identifiable. You wouldn't
need the terminus capability then either (as above, its only going to be half
useful by looks of things anyway).
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws
Exception {
}
private boolean isReplicaTarget(Link link) {
- return link != null && link.getTarget() != null &&
link.getTarget().getAddress() != null &&
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+ boolean hasMirror = false;
+
+ Terminus terminus = (Terminus)link.getTarget();
+ if (terminus != null && terminus.getCapabilities() != null) {
+ for (Symbol s : terminus.getCapabilities()) {
+ if (s.equals(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
+ hasMirror = true;
+ break;
+ }
+ }
+ }
+
+ return link != null && link.getTarget() != null &&
link.getTarget().getAddress() != null && hasMirror;
Review comment:
Superfluous null check, as it would have NPE'd at the top of the method
otherwise. The terminus also has a variable that could be used.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java
##########
@@ -45,23 +43,25 @@ public void durableMessageDataNotScannedOnRestartTest()
throws Exception {
server.stop();
server.start();
- final AMQPMessage amqpMessage;
-
final Queue afterRestartQueueView = getProxyToQueue(getQueueName());
Wait.assertTrue("All messages should arrive", () ->
afterRestartQueueView.getMessageCount() == 1);
- try (LinkedListIterator<MessageReference> iterator =
afterRestartQueueView.iterator()) {
- Assert.assertTrue(iterator.hasNext());
- final MessageReference next = iterator.next();
- Assert.assertNotNull(next);
- Assert.assertFalse(iterator.hasNext());
- final Message message = next.getMessage();
- Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
- amqpMessage = (AMQPMessage) message;
-
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE,
amqpMessage.getDataScanningStatus());
- Assert.assertTrue(amqpMessage.isDurable());
- }
+ final AtomicInteger foreachCount = new AtomicInteger(0);
+
+ ArrayList<AMQPMessage> messageReference = new ArrayList<>(1);
+
+ afterRestartQueueView.forEach((next) -> {
Review comment:
The commit message seems confusing, it says "Changing
AmqpJournalLoadingTest to use queue.iterator after the method addition" yet
this appears to stop using the iterator method and start using the forEach
method"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 625328)
Time Spent: 23h 10m (was: 23h)
> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
> Key: ARTEMIS-3243
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.17.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 23h 10m
> Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each
> other, each one having its own ID, and each one would send updates to the
> other broker.
> The outcome is that if you just transferred producers and consumers from one
> broker into the other, the fallback would be automatic and simple. No need to
> disable and enable mirror options.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)