[
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=626761&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-626761
]
ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jul/21 16:13
Start Date: 22/Jul/21 16:13
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r674866234
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -511,22 +534,88 @@ private void connectSender(Queue queue,
sender.setProperties(mapProperties);
}
+ if (desiredCapabilities != null) {
+ sender.setDesiredCapabilities(desiredCapabilities);
+ }
+
AMQPOutgoingController outgoingInitializer = new
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
+ sender.open();
+
ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(),
outgoingInitializer).setBeforeDelivery(beforeDeliver);
+ // This will be done on the remote open
+ sender.setContext(new Runnable() {
+ @Override
+ public void run() {
Review comment:
This or the bit executing it should verify the remote target is set, if
its null that is an indication the link is being refused and a detach with
error will follow the attach to explain why.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -511,22 +534,88 @@ private void connectSender(Queue queue,
sender.setProperties(mapProperties);
}
+ if (desiredCapabilities != null) {
+ sender.setDesiredCapabilities(desiredCapabilities);
+ }
+
AMQPOutgoingController outgoingInitializer = new
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
+ sender.open();
+
ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(),
outgoingInitializer).setBeforeDelivery(beforeDeliver);
+ // This will be done on the remote open
+ sender.setContext(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (desiredCapabilities != null) {
+ if (!verifyCapabilities(sender, desiredCapabilities)) {
Review comment:
"verifyOfferedCapabilities" might be clearer.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/exceptions/InvalidBrokerID.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.exceptions;
+
+public class InvalidBrokerID extends Exception {
+ final Object localID;
+ final Object remoteID;
+
+ public InvalidBrokerID(Object localID, Object remoteID) {
+ super(InvalidBrokerID.class.getName() + "(" + localID + "," + remoteID +
")");
Review comment:
The only case this seems to be used is when the remote is non-null (as
the context would NPE otherwise) and the local is the same value, so does it
need to take and print both? Would something like 'Invalid remote broker ID,
matches local: <id>' be clearer?
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -511,22 +534,88 @@ private void connectSender(Queue queue,
sender.setProperties(mapProperties);
}
+ if (desiredCapabilities != null) {
+ sender.setDesiredCapabilities(desiredCapabilities);
+ }
+
AMQPOutgoingController outgoingInitializer = new
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
+ sender.open();
+
ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(),
outgoingInitializer).setBeforeDelivery(beforeDeliver);
+ // This will be done on the remote open
+ sender.setContext(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (desiredCapabilities != null) {
+ if (!verifyCapabilities(sender, desiredCapabilities)) {
+ error(new MissingCapability(desiredCapabilities),
lastRetryCounter);
+ return;
+ }
+ }
+ if (brokerID != null) {
+ if (sender.getRemoteProperties() == null ||
+
!sender.getRemoteProperties().containsKey(AMQPMirrorControllerSource.BROKER_ID))
{
+ error(new MissingBrokerID(), lastRetryCounter);
+ return;
+ }
+
+ Object remoteBrokerID =
sender.getRemoteProperties().get(AMQPMirrorControllerSource.BROKER_ID);
+ if (remoteBrokerID.equals(brokerID)) {
+ error(new InvalidBrokerID(brokerID,
remoteBrokerID), lastRetryCounter);
+ }
+ }
+ sessionContext.addSender(sender, senderContext);
+ if (senderConsumer != null) {
+ senderConsumer.accept(sender);
+ }
+ } catch (Exception e) {
+ error(e);
+ }
+ }
+ });
- sessionContext.addSender(sender, senderContext);
- if (senderConsumer != null) {
- senderConsumer.accept(sender);
- }
} catch (Exception e) {
error(e);
}
protonRemotingConnection.getAmqpConnection().flush();
});
}
+ protected boolean verifyCapabilities(Sender sender, Symbol[] capabilities) {
Review comment:
desiredCapabilities for clarity?
##########
File path: examples/features/broker-connection/disaster-recovery/readme.md
##########
@@ -0,0 +1,11 @@
+# AMQP Broker Connection with Senders and SSL
Review comment:
Title needs updated...well, whole readme etc need updated to describe
the actual usage and remove various mentions of SSL since it isnt using it.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -276,8 +285,11 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
return;
}
- if ((controllerInUse != null &&
controllerInUse.getRemoteMirrorId().equals(getRemoteMirrorId())) || // mirror
reflection ||
- (ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) { // control queues {
+ if (invalidTarget(controllerInUse)) {
+ return;
+ }
+
+ if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) { // control queues {
Review comment:
Straggling { after comment.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -172,6 +175,15 @@ public AMQPConnectionContext(ProtonProtocolManager
protocolManager,
}
}
+ public LinkCloseListener getLinkCloseListener() {
Review comment:
Does this also handle detaches with closed=false? LinkDetachListener
that takes a boolean might be more widely applicable.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -511,22 +534,88 @@ private void connectSender(Queue queue,
sender.setProperties(mapProperties);
}
+ if (desiredCapabilities != null) {
+ sender.setDesiredCapabilities(desiredCapabilities);
+ }
+
AMQPOutgoingController outgoingInitializer = new
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
+ sender.open();
+
ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(),
outgoingInitializer).setBeforeDelivery(beforeDeliver);
+ // This will be done on the remote open
Review comment:
If that happens. Is there a timeout or such that needs applied in case
it doesnt?
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -511,22 +534,88 @@ private void connectSender(Queue queue,
sender.setProperties(mapProperties);
}
+ if (desiredCapabilities != null) {
+ sender.setDesiredCapabilities(desiredCapabilities);
+ }
+
AMQPOutgoingController outgoingInitializer = new
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
+ sender.open();
+
ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(),
outgoingInitializer).setBeforeDelivery(beforeDeliver);
+ // This will be done on the remote open
+ sender.setContext(new Runnable() {
Review comment:
The context is also used after in other places to set the server object
on. Presumably thats why "sessionContext.addSender(sender, senderContext);"
moved inside the runnable. You could set this in the 'attachments()' Record
instead of using the context field for both it you liked.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -340,7 +344,19 @@ protected void remoteLinkOpened(Link link) throws
Exception {
}
private boolean isReplicaTarget(Link link) {
- return link != null && link.getTarget() != null &&
link.getTarget().getAddress() != null &&
link.getTarget().getAddress().equals(ProtonProtocolManager.MIRROR_ADDRESS);
+ boolean hasMirror = false;
+
+ Terminus terminus = (Terminus)link.getTarget();
+ if (terminus != null && terminus.getCapabilities() != null) {
+ for (Symbol s : terminus.getCapabilities()) {
+ if (s.equals(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
+ hasMirror = true;
+ break;
+ }
+ }
+ }
Review comment:
Target-address is probably the most description. Perhaps something like
mirror-address or mirror-target might work. Just internal-address seems a
little generic.
I still like the idea of having $ACTIVEMQ_ARTEMIS_MIRROR server as a kind of
address namespace prefix (or exact match) for determining all the mirror links,
grouping them in a way, and allow dropping the terminus capability. Right now
the address seems somewhat ignored other than the permission check which seems
a little funny as it seems folks could use any other address they do have
permission on?
##########
File path:
examples/features/broker-connection/disaster-recovery/src/main/resources/activemq/server0/broker.xml
##########
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+ <core xmlns="urn:activemq:core">
+
+ <bindings-directory>./data/messaging/bindings</bindings-directory>
+
+ <journal-directory>./data/messaging/journal</journal-directory>
+
+
<large-messages-directory>./data/messaging/largemessages</large-messages-directory>
+
+ <paging-directory>./data/messaging/paging</paging-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
Review comment:
Suggest not using the AMQPS port since this isnt using SSL
##########
File path:
examples/features/broker-connection/disaster-recovery/src/main/resources/activemq/server0/broker.xml
##########
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+ <core xmlns="urn:activemq:core">
+
+ <bindings-directory>./data/messaging/bindings</bindings-directory>
+
+ <journal-directory>./data/messaging/journal</journal-directory>
+
+
<large-messages-directory>./data/messaging/largemessages</large-messages-directory>
+
+ <paging-directory>./data/messaging/paging</paging-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:5771" name="otherBrokerSSL"
retry-interval="1000">
Review comment:
Suggest changing name as it isnt using SSL
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -324,10 +344,11 @@ protected void remoteLinkOpened(Link link) throws
Exception {
} else {
if (isReplicaTarget(receiver)) {
try {
-
protonSession.getSessionSPI().check(SimpleString.toSimpleString(ProtonProtocolManager.MIRROR_ADDRESS),
CheckType.SEND, getSecurityAuth());
+
protonSession.getSessionSPI().check(SimpleString.toSimpleString(link.getTarget().getAddress()),
CheckType.SEND, getSecurityAuth());
} catch (ActiveMQSecurityException e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingProducer(e.getMessage());
}
+ receiver.setOfferedCapabilities(new
Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
Review comment:
It should verify it was a desired capability and refuse the links that
dont have it set, i.e old brokers that dont know about it, or future brokers
should they change in incompatible ways and update the capability. (or current
brokers whos info got stripped by a router intermediary for now)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 626761)
Time Spent: 24h 10m (was: 24h)
> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
> Key: ARTEMIS-3243
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.17.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 24h 10m
> Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each
> other, each one having its own ID, and each one would send updates to the
> other broker.
> The outcome is that if you just transferred producers and consumers from one
> broker into the other, the fallback would be automatic and simple. No need to
> disable and enable mirror options.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)