[
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=617677&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-617677
]
ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Jul/21 16:03
Start Date: 01/Jul/21 16:03
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r662334845
##########
File path:
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IDSupplier.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import io.netty.util.collection.LongObjectHashMap;
+
+/** This interface is meant to encapsulate a HashMap(ListID,
LongObjectHashMap(ElementType)) .
+ * (notice I am using parenthesis instead of < and > to make it easier
to read on the source code)
Review comment:
You can use tags like e.g @code and @literal to avoid needing escapes or
explainers.
##########
File path:
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IDSupplier.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import io.netty.util.collection.LongObjectHashMap;
+
+/** This interface is meant to encapsulate a HashMap(ListID,
LongObjectHashMap(ElementType)) .
+ * (notice I am using parenthesis instead of < and > to make it easier
to read on the source code)
+ *
+ * ListID should be translated as ServerID when in use by ActiveMQ Artemis.
the ListID will probably be a server UUID.
+ *
+ * The implementation should always provide the same instance of a list for a
ListID. */
+public interface IDSupplier<E> {
+ LongObjectHashMap<LinkedListImpl.Node<E>> getList(Object ListID);
Review comment:
Perhaps getMap? getNodes?
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -239,13 +225,14 @@ protected void actualDelivery(AMQPMessage message,
Delivery delivery, Receiver r
deleteQueue(SimpleString.toSimpleString(address),
SimpleString.toSimpleString(queueName));
} else if (eventType.equals(POST_ACK)) {
String address = (String)
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
+ String nodeID = (String)
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
String queueName = (String)
AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
AmqpValue value = (AmqpValue) message.getBody();
Long messageID = (Long) value.getValue();
if (logger.isDebugEnabled()) {
logger.debug(server + " Post ack address=" + address + "
queueName = " + queueName + " messageID=" + messageID + "(mirrorID=" +
ByteUtil.getFirstByte(messageID) + ", messageID=" +
ByteUtil.removeFirstByte(messageID) + ")");
}
- if (postAcknowledge(address, queueName, messageID,
messageAckOperation)) {
+ if (postAcknowledge(address, queueName, nodeID, messageID,
messageAckOperation)) {
Review comment:
Various debug logging here and elsewhere splitting the long body /
message-id value into components that needs updated or removed.
##########
File path:
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -92,9 +86,11 @@ public void setIDSupplier(ToLongFunction<E> supplier) {
}
private void putID(E value, Node<E> position) {
- long id = idSupplier.applyAsLong(value);
- if (id >= 0) {
- nodeMap.put(id, position);
+ Object listID = idSupplier.getListID(value);
+ LongObjectHashMap<Node<E>> nodesForList = idSupplier.getList(listID);
+ long theID = idSupplier.getID(value);
+ if (nodesForList != null) {
+ nodesForList.put(theID, position);
Review comment:
pre-existing code, but maybe 'node' instead of 'position' might read a
little better? Woudl be consistent with use in removeWithID also.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -387,7 +387,7 @@ public void testNoAddressWithAnnotations() throws Exception
{
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
- server_2.getConfiguration().setBrokerMirrorId(2);
+ server_2.getConfiguration();
Review comment:
can this line be deleted now? (and others on lines 694, 702)
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
##########
@@ -129,11 +129,91 @@ public void testSyncOnCreateQueues() throws Exception {
server.stop();
}
+
+ @Test
+ public void testSingleMessage() throws Exception {
+ server.setIdentity("Server1");
+ server.getConfiguration();
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ }
+ server.start();
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("Server2");
+ server_2.getConfiguration();
Review comment:
can this line be deleted now? (and others on lines 216, 226, 351, 355)
##########
File path:
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
##########
@@ -20,18 +20,17 @@
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.function.ToLongFunction;
/**
* A priority linked list implementation
* <p>
* It implements this by maintaining an individual LinkedBlockingDeque for
each priority level.
*/
-public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
+public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
Review comment:
Would have been nicer to review actual changes if the type holder was
renamed separately (not suggesting you change it back).
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -356,24 +343,29 @@ public boolean postAcknowledge(String address, String
queue, long messageID, ACK
logger.trace("Server " + server.getIdentity() + " with queue = " +
queue + " being acked for " + messageID + " coming from " + messageID + "
targetQueue = " + targetQueue);
}
- performAck(messageID, targetQueue, ackMessage, true);
+ performAck(nodeID, messageID, targetQueue, ackMessage, true);
return true;
}
- private void performAck(long messageID, Queue targetQueue,
ACKMessageOperation ackMessageOperation, boolean retry) {
+ private void performAck(Object serverID, long messageID, Queue targetQueue,
ACKMessageOperation ackMessageOperation, boolean retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck " + messageID + "(messageID=" +
ByteUtil.removeFirstByte(messageID) + "), targetQueue=" +
targetQueue.getName());
}
- MessageReference reference = targetQueue.removeWithSuppliedID(messageID,
referenceIDSupplier);
+ MessageReference reference = targetQueue.removeWithSuppliedID(serverID,
messageID, referenceIDSupplier);
if (reference == null && retry) {
+ targetQueue.forEach((r) -> {
+ System.out.println("reference = " + r);
+ System.out.println("nodeID = " + referenceIDSupplier.getListID(r)
+ " and remoteMessageID = " + referenceIDSupplier.getID(r));
+ });
+
Review comment:
Leftover debug print.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 617677)
Time Spent: 8.5h (was: 8h 20m)
> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
> Key: ARTEMIS-3243
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.17.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 8.5h
> Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each
> other, each one having its own ID, and each one would send updates to the
> other broker.
> The outcome is that if you just transferred producers and consumers from one
> broker into the other, the fallback would be automatic and simple. No need to
> disable and enable mirror options.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)