This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d408f28 ARTEMIS-3238 AMQP Mirror not routing correctly with SNF
d408f28 is described below
commit d408f284b171c361000a0a3876da3a155a8b5255
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Apr 12 14:36:49 2021 -0400
ARTEMIS-3238 AMQP Mirror not routing correctly with SNF
---
.../amqp/connect/AMQPBrokerConnection.java | 10 +--
.../connect/mirror/AMQPMirrorControllerSource.java | 43 ++++++++-----
.../amqp/proton/ProtonServerSenderContext.java | 10 +++
.../core/postoffice/impl/PostOfficeImpl.java | 47 ++++++++------
.../integration/amqp/connect/AMQPReplicaTest.java | 72 ++++++++++++++++++++++
5 files changed, 144 insertions(+), 38 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 2068d11..94be0e8 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -48,6 +48,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
@@ -204,11 +205,11 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
public void createLink(Queue queue, AMQPBrokerConnectionElement
connectionElement) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER)
{
- connectSender(queue, queue.getAddress().toString(),
Symbol.valueOf("qd.waypoint"));
+ connectSender(queue, queue.getAddress().toString(), null,
Symbol.valueOf("qd.waypoint"));
connectReceiver(protonRemotingConnection, session, sessionContext,
queue, Symbol.valueOf("qd.waypoint"));
} else {
if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.SENDER) {
- connectSender(queue, queue.getAddress().toString());
+ connectSender(queue, queue.getAddress().toString(), null);
}
if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext,
queue);
@@ -278,7 +279,7 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
AMQPMirrorBrokerConnectionElement replica =
(AMQPMirrorBrokerConnectionElement)connectionElement;
Queue queue =
server.locateQueue(replica.getSourceMirrorAddress());
- connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS);
+ connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS,
(r) -> AMQPMirrorControllerSource.validateProtocolData(r,
replica.getSourceMirrorAddress()));
}
}
}
@@ -457,6 +458,7 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
private void connectSender(Queue queue,
String targetName,
+ java.util.function.Consumer<? super
MessageReference> beforeDeliver,
Symbol... capabilities) {
if (logger.isDebugEnabled()) {
logger.debug("Connecting outbound for " + queue);
@@ -491,7 +493,7 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
AMQPOutgoingController outgoingInitializer = new
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
- ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(), outgoingInitializer);
+ ProtonServerSenderContext senderContext = new
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender,
sessionContext, sessionContext.getSessionSPI(),
outgoingInitializer).setBeforeDelivery(beforeDeliver);
sessionContext.addSender(sender, senderContext);
} catch (Exception e) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 86dda6d..966a607 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -155,28 +156,42 @@ public class AMQPMirrorControllerSource implements
MirrorController, ActiveMQCom
MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
snfQueue.refUp(ref);
+ refs.add(ref);
+ message.usageUp();
+
+ setProtocolData(ref);
- Map<Symbol, Object> daMap = new HashMap<>();
- DeliveryAnnotations deliveryAnnotations = new
DeliveryAnnotations(daMap);
- daMap.put(INTERNAL_ID, message.getMessageID());
- String address = message.getAddress();
- if (address != null) { // this is the message that was set through
routing
- Properties amqpProperties = getProperties(message);
- if (amqpProperties == null ||
!address.equals(amqpProperties.getTo())) {
- // We set the internal destination property only if we need to
- // otherwise we just use the one already set over Properties
- daMap.put(INTERNAL_DESTINATION, message.getAddress());
- }
+ if (message.isDurable() && snfQueue.isDurable()) {
+ PostOfficeImpl.storeDurableReference(server.getStorageManager(),
message, context.getTransaction(), snfQueue, true);
}
- ref.setProtocolData(deliveryAnnotations);
- refs.add(ref);
- message.usageUp();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
+ public static void validateProtocolData(MessageReference ref, SimpleString
snfAddress) {
+ if (ref.getProtocolData() == null &&
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
+ setProtocolData(ref);
+ }
+ }
+
+ private static void setProtocolData(MessageReference ref) {
+ Map<Symbol, Object> daMap = new HashMap<>();
+ DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
+ daMap.put(INTERNAL_ID, ref.getMessage().getMessageID());
+ String address = ref.getMessage().getAddress();
+ if (address != null) { // this is the message that was set through
routing
+ Properties amqpProperties = getProperties(ref.getMessage());
+ if (amqpProperties == null ||
!address.equals(amqpProperties.getTo())) {
+ // We set the internal destination property only if we need to
+ // otherwise we just use the one already set over Properties
+ daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
+ }
+ }
+ ref.setProtocolData(deliveryAnnotations);
+ }
+
private static Properties getProperties(Message message) {
if (message instanceof AMQPMessage) {
return
AMQPMessageBrokerAccessor.getCurrentProperties((AMQPMessage)message);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ffcf5af..0e138d8 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -135,6 +135,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
* */
private final Object creditsLock = new Object();
private final java.util.function.Consumer<? super MessageReference>
executeDelivery;
+ private java.util.function.Consumer<? super MessageReference>
beforeDelivery;
private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;
public ProtonServerSenderContext(AMQPConnectionContext connection,
@@ -160,6 +161,11 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
.isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
}
+ public ProtonServerSenderContext
setBeforeDelivery(java.util.function.Consumer<? super MessageReference>
beforeDelivery) {
+ this.beforeDelivery = beforeDelivery;
+ return this;
+ }
+
public Object getBrokerConsumer() {
return brokerConsumer;
}
@@ -492,6 +498,10 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
return 0;
}
+ if (beforeDelivery != null) {
+ beforeDelivery.accept(messageReference);
+ }
+
try {
synchronized (creditsLock) {
if (sender.getLocalState() == EndpointState.CLOSED) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 365ce01..a4c2605 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1519,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if (store != null && storageManager.addToPage(store, message,
context.getTransaction(), entry.getValue())) {
if (message.isLargeMessage()) {
- confirmLargeMessageSend(tx, message);
+ confirmLargeMessageSend(storageManager, tx, message);
}
// We need to kick delivery so the Queues may check for the
cursors case they are empty
@@ -1595,24 +1595,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
refs.add(reference);
queue.refUp(reference);
if (message.isDurable()) {
- final int durableRefCount = queue.durableUp(message);
- if (durableRefCount == 1) {
- if (tx != null) {
- storageManager.storeMessageTransactional(tx.getID(),
message);
- } else {
- storageManager.storeMessage(message);
- }
- if (message.isLargeMessage()) {
- confirmLargeMessageSend(tx, message);
- }
- }
- if (tx != null) {
- storageManager.storeReferenceTransactional(tx.getID(),
queue.getID(), message.getMessageID());
- tx.setContainsPersistent();
- } else {
- final boolean last = i == (durableQueuesCount - 1);
- storageManager.storeReference(queue.getID(),
message.getMessageID(), last);
- }
+ storeDurableReference(storageManager, message, tx, queue,
durableQueuesCount == i);
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
@@ -1624,12 +1607,36 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
+ public static void storeDurableReference(StorageManager storageManager,
Message message,
+ Transaction tx,
+ Queue queue, boolean sync) throws Exception {
+ assert message.isDurable();
+
+ final int durableRefCount = queue.durableUp(message);
+ if (durableRefCount == 1) {
+ if (tx != null) {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+ } else {
+ storageManager.storeMessage(message);
+ }
+ if (message.isLargeMessage()) {
+ confirmLargeMessageSend(storageManager, tx, message);
+ }
+ }
+ if (tx != null) {
+ storageManager.storeReferenceTransactional(tx.getID(), queue.getID(),
message.getMessageID());
+ tx.setContainsPersistent();
+ } else {
+ storageManager.storeReference(queue.getID(), message.getMessageID(),
sync);
+ }
+ }
+
/**
* @param tx
* @param message
* @throws Exception
*/
- private void confirmLargeMessageSend(Transaction tx, final Message message)
throws Exception {
+ private static void confirmLargeMessageSend(StorageManager storageManager,
Transaction tx, final Message message) throws Exception {
LargeServerMessage largeServerMessage = (LargeServerMessage) message;
synchronized (largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 80a08e7..8483d75 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -446,6 +446,78 @@ public class AMQPReplicaTest extends AmqpClientTestSupport
{
}
}
+ @Test
+ public void testRouteSurviving() throws Exception {
+ testRouteSurvivor(false);
+ }
+
+ @Test
+ public void testRouteSurvivingStop() throws Exception {
+ testRouteSurvivor(true);
+ }
+
+
+ private void testRouteSurvivor(boolean server1Stopped) throws Exception {
+ if (!server1Stopped) {
+ server.start();
+ }
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("server_2");
+ server_2.getConfiguration().setName("thisone");
+
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("OtherSide", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPMirrorBrokerConnectionElement replica = new
AMQPMirrorBrokerConnectionElement().setSourceMirrorAddress("TheSource");
+ amqpConnection.addElement(replica);
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server_2.start();
+
+ // We create the address to avoid auto delete on the queue
+ server_2.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server_2.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ int NUMBER_OF_MESSAGES = 200;
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT_2);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("i=" + i));
+ }
+
+ connection.close();
+
+ {
+ if (!server1Stopped) {
+ Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+ Queue queueServer1 = server.locateQueue(getQueueName());
+ Wait.assertEquals(NUMBER_OF_MESSAGES,
queueServer1::getMessageCount);
+ }
+ Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+ Queue queueServer2 = server_2.locateQueue(getQueueName());
+ Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
+ }
+
+ if (!server1Stopped) {
+ server.stop();
+ }
+ server_2.stop();
+
+ server.start();
+ server_2.start();
+
+
+ Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+ Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+ Queue queueServer1 = server.locateQueue(getQueueName());
+ Queue queueServer2 = server_2.locateQueue(getQueueName());
+
+ Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer1::getMessageCount);
+ Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
+ }
private void replicaTest(boolean largeMessage,