This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8fc6f09195 ARTEMIS-5063 messageMoved addition in
ActiveMQServerMessagePlugin
8fc6f09195 is described below
commit 8fc6f09195b733eae056327b99abd9fe4d0eb788
Author: Jean-Pascal Briquet <[email protected]>
AuthorDate: Wed Sep 25 15:51:24 2024 +0200
ARTEMIS-5063 messageMoved addition in ActiveMQServerMessagePlugin
---
.../artemis/core/server/impl/QueueImpl.java | 4 +++
.../server/plugin/ActiveMQServerMessagePlugin.java | 24 +++++++++++++++++
.../tests/integration/plugin/CorePluginTest.java | 31 ++++++++++++++++++++++
.../integration/plugin/MethodCalledVerifier.java | 18 +++++++++++++
4 files changed, 77 insertions(+)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ccaeb50a31..d86a512a31 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3665,6 +3665,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
tx.commit();
}
+ if (server.hasBrokerMessagePlugins()) {
+ server.callBrokerMessagePlugins(plugin -> plugin.messageMoved(tx,
ref, reason, address, queueID, consumer, copyMessage, routingStatus));
+ }
+
return routingStatus;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
index 93d9fb2a47..dd4ff0115d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
@@ -290,4 +290,28 @@ public interface ActiveMQServerMessagePlugin extends
ActiveMQServerBasePlugin {
//by default call the old method for backwards compatibility
this.messageAcknowledged(ref, reason, consumer);
}
+
+ /**
+ * A message has been moved
+ *
+ * @param tx The transaction associated with the move
+ * @param ref The ref of the message moved
+ * @param reason The move reason
+ * @param destAddress the destination address for the move operation
+ * @param destQueueID the destination queueID for the move operation - this
field is optional and can be null
+ * @param consumer the consumer that moved the message - this field is
optional and can be null
+ * @param newMessage the new message created by the move operation
+ * @param result routing status of the move operation
+ * @throws ActiveMQException
+ */
+ default void messageMoved(final Transaction tx,
+ final MessageReference ref,
+ final AckReason reason,
+ final SimpleString destAddress,
+ final Long destQueueID,
+ final ServerConsumer consumer,
+ final Message newMessage,
+ final RoutingStatus result) throws
ActiveMQException {
+
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index 62a40b4b20..1d2db65fb2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -54,6 +54,7 @@ import
org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -96,6 +97,7 @@ import static
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_UPDATE_ADDRESS;
import static
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+import static
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_MOVED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -412,6 +414,35 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS,
AFTER_UPDATE_ADDRESS);
}
+ @Test
+ public void testMessageMoved() throws Exception {
+ final String queue1Name = "queue1";
+ final String queue2Name = "queue2";
+ createQueue(queue2Name);
+ org.apache.activemq.artemis.core.server.Queue artemisQueue =
server.locateQueue(queue1Name);
+ org.apache.activemq.artemis.core.server.Queue artemisQueue2 =
server.locateQueue(queue2Name);
+
+ conn = cf.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ byte[] msgs = new byte[1024];
+ for (int i = 0; i < msgs.length; i++) {
+ msgs[i] = RandomUtil.randomByte();
+ }
+
+ TextMessage msg1 = sess.createTextMessage(new String(msgs));
+ prod.send(msg1);
+ conn.close();
+
+ artemisQueue.moveReferences(null, artemisQueue2.getAddress(), null);
+ Wait.assertEquals(1L, artemisQueue2::getMessageCount, 2000, 100);
+
+ verifier.validatePluginMethodsEquals(1, MESSAGE_MOVED);
+ }
+
private class AckPluginVerifier implements ActiveMQServerPlugin {
private BiConsumer<ServerConsumer, AckReason> assertion;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index fdc0c9d26d..f1e2a99bb3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -87,6 +87,7 @@ public class MethodCalledVerifier implements
ActiveMQServerPlugin {
public static final String AFTER_REMOVE_BINDING = "afterRemoveBinding";
public static final String MESSAGE_EXPIRED = "messageExpired";
public static final String MESSAGE_ACKED = "messageAcknowledged";
+ public static final String MESSAGE_MOVED = "messageMoved";
public static final String BEFORE_SEND = "beforeSend";
public static final String AFTER_SEND = "afterSend";
public static final String ON_SEND_EXCEPTION = "onSendException";
@@ -299,6 +300,23 @@ public class MethodCalledVerifier implements
ActiveMQServerPlugin {
methodCalled(MESSAGE_ACKED);
}
+ @Override
+ public void messageMoved(final Transaction tx,
+ final MessageReference ref,
+ final AckReason reason,
+ final SimpleString destAddress,
+ final Long destQueueID,
+ final ServerConsumer consumer,
+ final Message newMessage,
+ final RoutingStatus result) {
+ Objects.requireNonNull(ref);
+ Objects.requireNonNull(reason);
+ Objects.requireNonNull(destAddress);
+ Objects.requireNonNull(newMessage);
+ Objects.requireNonNull(result);
+ methodCalled(MESSAGE_MOVED);
+ }
+
@Override
public void beforeSend(ServerSession session, Transaction tx, Message
message, boolean direct,
boolean noAutoCreateQueue) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact