This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d408f28  ARTEMIS-3238 AMQP Mirror not routing correctly with SNF
d408f28 is described below

commit d408f284b171c361000a0a3876da3a155a8b5255
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Apr 12 14:36:49 2021 -0400

    ARTEMIS-3238 AMQP Mirror not routing correctly with SNF
---
 .../amqp/connect/AMQPBrokerConnection.java         | 10 +--
 .../connect/mirror/AMQPMirrorControllerSource.java | 43 ++++++++-----
 .../amqp/proton/ProtonServerSenderContext.java     | 10 +++
 .../core/postoffice/impl/PostOfficeImpl.java       | 47 ++++++++------
 .../integration/amqp/connect/AMQPReplicaTest.java  | 72 ++++++++++++++++++++++
 5 files changed, 144 insertions(+), 38 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 2068d11..94be0e8 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -48,6 +48,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.BrokerConnection;
 import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
@@ -204,11 +205,11 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
    public void createLink(Queue queue, AMQPBrokerConnectionElement 
connectionElement) {
       if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) 
{
-         connectSender(queue, queue.getAddress().toString(), 
Symbol.valueOf("qd.waypoint"));
+         connectSender(queue, queue.getAddress().toString(), null, 
Symbol.valueOf("qd.waypoint"));
          connectReceiver(protonRemotingConnection, session, sessionContext, 
queue, Symbol.valueOf("qd.waypoint"));
       } else {
          if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.SENDER) {
-            connectSender(queue, queue.getAddress().toString());
+            connectSender(queue, queue.getAddress().toString(), null);
          }
          if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.RECEIVER) {
             connectReceiver(protonRemotingConnection, session, sessionContext, 
queue);
@@ -278,7 +279,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
                   AMQPMirrorBrokerConnectionElement replica = 
(AMQPMirrorBrokerConnectionElement)connectionElement;
                   Queue queue = 
server.locateQueue(replica.getSourceMirrorAddress());
 
-                  connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS);
+                  connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS, 
(r) -> AMQPMirrorControllerSource.validateProtocolData(r, 
replica.getSourceMirrorAddress()));
                }
             }
          }
@@ -457,6 +458,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
    private void connectSender(Queue queue,
                               String targetName,
+                              java.util.function.Consumer<? super 
MessageReference> beforeDeliver,
                               Symbol... capabilities) {
       if (logger.isDebugEnabled()) {
          logger.debug("Connecting outbound for " + queue);
@@ -491,7 +493,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
             AMQPOutgoingController outgoingInitializer = new 
AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
 
-            ProtonServerSenderContext senderContext = new 
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, 
sessionContext, sessionContext.getSessionSPI(), outgoingInitializer);
+            ProtonServerSenderContext senderContext = new 
ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, 
sessionContext, sessionContext.getSessionSPI(), 
outgoingInitializer).setBeforeDelivery(beforeDeliver);
 
             sessionContext.addSender(sender, senderContext);
          } catch (Exception e) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 86dda6d..966a607 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -155,28 +156,42 @@ public class AMQPMirrorControllerSource implements 
MirrorController, ActiveMQCom
 
          MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
          snfQueue.refUp(ref);
+         refs.add(ref);
+         message.usageUp();
+
+         setProtocolData(ref);
 
-         Map<Symbol, Object> daMap = new HashMap<>();
-         DeliveryAnnotations deliveryAnnotations = new 
DeliveryAnnotations(daMap);
-         daMap.put(INTERNAL_ID, message.getMessageID());
-         String address = message.getAddress();
-         if (address != null) { // this is the message that was set through 
routing
-            Properties amqpProperties = getProperties(message);
-            if (amqpProperties == null || 
!address.equals(amqpProperties.getTo())) {
-               // We set the internal destination property only if we need to
-               // otherwise we just use the one already set over Properties
-               daMap.put(INTERNAL_DESTINATION, message.getAddress());
-            }
+         if (message.isDurable() && snfQueue.isDurable()) {
+            PostOfficeImpl.storeDurableReference(server.getStorageManager(), 
message, context.getTransaction(), snfQueue, true);
          }
-         ref.setProtocolData(deliveryAnnotations);
 
-         refs.add(ref);
-         message.usageUp();
       } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
       }
    }
 
+   public static void validateProtocolData(MessageReference ref, SimpleString 
snfAddress) {
+      if (ref.getProtocolData() == null && 
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
+         setProtocolData(ref);
+      }
+   }
+
+   private static void setProtocolData(MessageReference ref) {
+      Map<Symbol, Object> daMap = new HashMap<>();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
+      daMap.put(INTERNAL_ID, ref.getMessage().getMessageID());
+      String address = ref.getMessage().getAddress();
+      if (address != null) { // this is the message that was set through 
routing
+         Properties amqpProperties = getProperties(ref.getMessage());
+         if (amqpProperties == null || 
!address.equals(amqpProperties.getTo())) {
+            // We set the internal destination property only if we need to
+            // otherwise we just use the one already set over Properties
+            daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
+         }
+      }
+      ref.setProtocolData(deliveryAnnotations);
+   }
+
    private static Properties getProperties(Message message) {
       if (message instanceof AMQPMessage) {
          return 
AMQPMessageBrokerAccessor.getCurrentProperties((AMQPMessage)message);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ffcf5af..0e138d8 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -135,6 +135,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
     * */
    private final Object creditsLock = new Object();
    private final java.util.function.Consumer<? super MessageReference> 
executeDelivery;
+   private java.util.function.Consumer<? super MessageReference> 
beforeDelivery;
    private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
@@ -160,6 +161,11 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                                                                  
.isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
    }
 
+   public ProtonServerSenderContext 
setBeforeDelivery(java.util.function.Consumer<? super MessageReference> 
beforeDelivery) {
+      this.beforeDelivery = beforeDelivery;
+      return this;
+   }
+
    public Object getBrokerConsumer() {
       return brokerConsumer;
    }
@@ -492,6 +498,10 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          return 0;
       }
 
+      if (beforeDelivery != null) {
+         beforeDelivery.accept(messageReference);
+      }
+
       try {
          synchronized (creditsLock) {
             if (sender.getLocalState() == EndpointState.CLOSED) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 365ce01..a4c2605 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1519,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
          if (store != null && storageManager.addToPage(store, message, 
context.getTransaction(), entry.getValue())) {
             if (message.isLargeMessage()) {
-               confirmLargeMessageSend(tx, message);
+               confirmLargeMessageSend(storageManager, tx, message);
             }
 
             // We need to kick delivery so the Queues may check for the 
cursors case they are empty
@@ -1595,24 +1595,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          refs.add(reference);
          queue.refUp(reference);
          if (message.isDurable()) {
-            final int durableRefCount = queue.durableUp(message);
-            if (durableRefCount == 1) {
-               if (tx != null) {
-                  storageManager.storeMessageTransactional(tx.getID(), 
message);
-               } else {
-                  storageManager.storeMessage(message);
-               }
-               if (message.isLargeMessage()) {
-                  confirmLargeMessageSend(tx, message);
-               }
-            }
-            if (tx != null) {
-               storageManager.storeReferenceTransactional(tx.getID(), 
queue.getID(), message.getMessageID());
-               tx.setContainsPersistent();
-            } else {
-               final boolean last = i == (durableQueuesCount - 1);
-               storageManager.storeReference(queue.getID(), 
message.getMessageID(), last);
-            }
+            storeDurableReference(storageManager, message, tx, queue, 
durableQueuesCount == i);
             if (deliveryTime != null && deliveryTime > 0) {
                if (tx != null) {
                   
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
@@ -1624,12 +1607,36 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       }
    }
 
+   public static void storeDurableReference(StorageManager storageManager, 
Message message,
+                          Transaction tx,
+                          Queue queue, boolean sync) throws Exception {
+      assert message.isDurable();
+
+      final int durableRefCount = queue.durableUp(message);
+      if (durableRefCount == 1) {
+         if (tx != null) {
+            storageManager.storeMessageTransactional(tx.getID(), message);
+         } else {
+            storageManager.storeMessage(message);
+         }
+         if (message.isLargeMessage()) {
+            confirmLargeMessageSend(storageManager, tx, message);
+         }
+      }
+      if (tx != null) {
+         storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), 
message.getMessageID());
+         tx.setContainsPersistent();
+      } else {
+         storageManager.storeReference(queue.getID(), message.getMessageID(), 
sync);
+      }
+   }
+
    /**
     * @param tx
     * @param message
     * @throws Exception
     */
-   private void confirmLargeMessageSend(Transaction tx, final Message message) 
throws Exception {
+   private static void confirmLargeMessageSend(StorageManager storageManager, 
Transaction tx, final Message message) throws Exception {
       LargeServerMessage largeServerMessage = (LargeServerMessage) message;
       synchronized (largeServerMessage) {
          if (largeServerMessage.getPendingRecordID() >= 0) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 80a08e7..8483d75 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -446,6 +446,78 @@ public class AMQPReplicaTest extends AmqpClientTestSupport 
{
       }
    }
 
+   @Test
+   public void testRouteSurviving() throws Exception {
+      testRouteSurvivor(false);
+   }
+
+   @Test
+   public void testRouteSurvivingStop() throws Exception {
+      testRouteSurvivor(true);
+   }
+
+
+   private void testRouteSurvivor(boolean server1Stopped) throws Exception {
+      if (!server1Stopped) {
+         server.start();
+      }
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+      server_2.getConfiguration().setName("thisone");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("OtherSide", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setSourceMirrorAddress("TheSource");
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      server_2.start();
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+      int NUMBER_OF_MESSAGES = 200;
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT_2);
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         producer.send(session.createTextMessage("i=" + i));
+      }
+
+      connection.close();
+
+      {
+         if (!server1Stopped) {
+            Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+            Queue queueServer1 = server.locateQueue(getQueueName());
+            Wait.assertEquals(NUMBER_OF_MESSAGES, 
queueServer1::getMessageCount);
+         }
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Queue queueServer2 = server_2.locateQueue(getQueueName());
+         Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
+      }
+
+      if (!server1Stopped) {
+         server.stop();
+      }
+      server_2.stop();
+
+      server.start();
+      server_2.start();
+
+
+      Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+      Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+      Queue queueServer1 = server.locateQueue(getQueueName());
+      Queue queueServer2 = server_2.locateQueue(getQueueName());
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer1::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
+   }
 
 
    private void replicaTest(boolean largeMessage,

Reply via email to