This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fc6f09195 ARTEMIS-5063 messageMoved addition in 
ActiveMQServerMessagePlugin
8fc6f09195 is described below

commit 8fc6f09195b733eae056327b99abd9fe4d0eb788
Author: Jean-Pascal Briquet <[email protected]>
AuthorDate: Wed Sep 25 15:51:24 2024 +0200

    ARTEMIS-5063 messageMoved addition in ActiveMQServerMessagePlugin
---
 .../artemis/core/server/impl/QueueImpl.java        |  4 +++
 .../server/plugin/ActiveMQServerMessagePlugin.java | 24 +++++++++++++++++
 .../tests/integration/plugin/CorePluginTest.java   | 31 ++++++++++++++++++++++
 .../integration/plugin/MethodCalledVerifier.java   | 18 +++++++++++++
 4 files changed, 77 insertions(+)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ccaeb50a31..d86a512a31 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3665,6 +3665,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          tx.commit();
       }
 
+      if (server.hasBrokerMessagePlugins()) {
+         server.callBrokerMessagePlugins(plugin -> plugin.messageMoved(tx, 
ref, reason, address, queueID, consumer, copyMessage, routingStatus));
+      }
+
       return routingStatus;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
index 93d9fb2a47..dd4ff0115d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
@@ -290,4 +290,28 @@ public interface ActiveMQServerMessagePlugin extends 
ActiveMQServerBasePlugin {
       //by default call the old method for backwards compatibility
       this.messageAcknowledged(ref, reason, consumer);
    }
+
+   /**
+    * A message has been moved
+    *
+    * @param tx The transaction associated with the move
+    * @param ref The ref of the message moved
+    * @param reason The move reason
+    * @param destAddress the destination address for the move operation
+    * @param destQueueID the destination queueID for the move operation - this 
field is optional and can be null
+    * @param consumer the consumer that moved the message - this field is 
optional and can be null
+    * @param newMessage the new message created by the move operation
+    * @param result routing status of the move operation
+    * @throws ActiveMQException
+    */
+   default void messageMoved(final Transaction tx,
+                             final MessageReference ref,
+                             final AckReason reason,
+                             final SimpleString destAddress,
+                             final Long destQueueID,
+                             final ServerConsumer consumer,
+                             final Message newMessage,
+                             final RoutingStatus result) throws 
ActiveMQException {
+
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index 62a40b4b20..1d2db65fb2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -54,6 +54,7 @@ import 
org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -96,6 +97,7 @@ import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
 import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_UPDATE_ADDRESS;
 import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
 import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_MOVED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -412,6 +414,35 @@ public class CorePluginTest extends JMSTestBase {
       verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, 
AFTER_UPDATE_ADDRESS);
    }
 
+   @Test
+   public void testMessageMoved() throws Exception {
+      final String queue1Name = "queue1";
+      final String queue2Name = "queue2";
+      createQueue(queue2Name);
+      org.apache.activemq.artemis.core.server.Queue artemisQueue = 
server.locateQueue(queue1Name);
+      org.apache.activemq.artemis.core.server.Queue artemisQueue2 = 
server.locateQueue(queue2Name);
+
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+
+      byte[] msgs = new byte[1024];
+      for (int i = 0; i < msgs.length; i++) {
+         msgs[i] = RandomUtil.randomByte();
+      }
+
+      TextMessage msg1 = sess.createTextMessage(new String(msgs));
+      prod.send(msg1);
+      conn.close();
+
+      artemisQueue.moveReferences(null, artemisQueue2.getAddress(), null);
+      Wait.assertEquals(1L, artemisQueue2::getMessageCount, 2000, 100);
+
+      verifier.validatePluginMethodsEquals(1, MESSAGE_MOVED);
+   }
+
    private class AckPluginVerifier implements ActiveMQServerPlugin {
 
       private BiConsumer<ServerConsumer, AckReason> assertion;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index fdc0c9d26d..f1e2a99bb3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -87,6 +87,7 @@ public class MethodCalledVerifier implements 
ActiveMQServerPlugin {
    public static final String AFTER_REMOVE_BINDING = "afterRemoveBinding";
    public static final String MESSAGE_EXPIRED = "messageExpired";
    public static final String MESSAGE_ACKED = "messageAcknowledged";
+   public static final String MESSAGE_MOVED = "messageMoved";
    public static final String BEFORE_SEND = "beforeSend";
    public static final String AFTER_SEND = "afterSend";
    public static final String ON_SEND_EXCEPTION = "onSendException";
@@ -299,6 +300,23 @@ public class MethodCalledVerifier implements 
ActiveMQServerPlugin {
       methodCalled(MESSAGE_ACKED);
    }
 
+   @Override
+   public void messageMoved(final Transaction tx,
+                            final MessageReference ref,
+                            final AckReason reason,
+                            final SimpleString destAddress,
+                            final Long destQueueID,
+                            final ServerConsumer consumer,
+                            final Message newMessage,
+                            final RoutingStatus result) {
+      Objects.requireNonNull(ref);
+      Objects.requireNonNull(reason);
+      Objects.requireNonNull(destAddress);
+      Objects.requireNonNull(newMessage);
+      Objects.requireNonNull(result);
+      methodCalled(MESSAGE_MOVED);
+   }
+
    @Override
    public void beforeSend(ServerSession session, Transaction tx, Message 
message, boolean direct,
          boolean noAutoCreateQueue) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to