This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ce0dfca2d ARTEMIS-4959 moveMessages operation can move more messages 
than max messageCount
4ce0dfca2d is described below

commit 4ce0dfca2d3dcd47a456d77df79ed63790950a9b
Author: Howard Gao <[email protected]>
AuthorDate: Mon Jul 29 21:09:48 2024 +0800

    ARTEMIS-4959 moveMessages operation can move more messages than max 
messageCount
---
 .../core/management/impl/QueueControlImpl.java     |  4 ++
 .../artemis/core/server/impl/QueueImpl.java        | 42 +++++++++-------
 .../artemis/tests/util/ActiveMQTestBase.java       | 31 ++++++++++++
 .../integration/management/QueueControlTest.java   | 58 ++++++++++++++++++++++
 .../tests/integration/paging/PagingSendTest.java   | 35 -------------
 5 files changed, 116 insertions(+), 54 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index e3a054a436..86988c1c6c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1382,6 +1382,10 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                            final int messageCount) throws Exception {
       // this is a critical task, we need to prevent parallel tasks running
       try (AutoCloseable lock = server.managementLock()) {
+         if (this.queue.getName().toString().equals(otherQueueName)) {
+            //doesn't make sense to move messages to itself
+            throw new IllegalArgumentException("Cannot move messages onto 
itself");
+         }
          if (AuditLogger.isBaseLoggingEnabled()) {
             AuditLogger.moveMessages(queue, flushLimit, filterStr, 
otherQueueName, rejectDuplicates, messageCount);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 249c7e44a9..720ebd1f06 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2280,7 +2280,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                                       QueueIterateAction messageAction) throws 
Exception {
       int count = 0;
       int txCount = 0;
-      Integer expectedHits = messageAction.expectedHits();
       // This is to avoid scheduling depaging while iterQueue is happening
       // this should minimize the use of the paged executor.
       depagePending = true;
@@ -2296,7 +2295,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
             try (LinkedListIterator<MessageReference> iter = iterator()) {
 
-               while (iter.hasNext()) {
+               while (iter.hasNext() && 
!messageAction.expectedHitsReached(count)) {
                   MessageReference ref = iter.next();
 
                   if (filter1 == null || filter1.match(ref.getMessage())) {
@@ -2306,9 +2305,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                      }
                      txCount++;
                      count++;
-                     if (expectedHits != null && count >= 
expectedHits.intValue()) {
-                        break;
-                     }
                   }
                }
 
@@ -2320,11 +2316,18 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   txCount = 0;
                }
 
+               if (messageAction.expectedHitsReached(count)) {
+                  return count;
+               }
+
                List<MessageReference> cancelled = 
scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : 
filter1.match(ref.getMessage()));
                for (MessageReference messageReference : cancelled) {
                   messageAction.actMessage(tx, messageReference);
                   count++;
                   txCount++;
+                  if (messageAction.expectedHitsReached(count)) {
+                     break;
+                  }
                }
 
                if (txCount > 0) {
@@ -2336,7 +2339,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
 
          if (pageIterator != null) {
-            while (pageIterator.hasNext()) {
+            while (pageIterator.hasNext() && 
!messageAction.expectedHitsReached(count)) {
                PagedReference reference = pageIterator.next();
                pageIterator.remove();
 
@@ -2750,12 +2753,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       final Integer expectedHits = messageCount > 0 ? messageCount : null;
       final DuplicateIDCache targetDuplicateCache = 
postOffice.getDuplicateIDCache(toAddress);
 
-      return iterQueue(flushLimit, filter, new QueueIterateAction() {
-         @Override
-         public Integer expectedHits() {
-            return expectedHits;
-         }
-
+      return iterQueue(flushLimit, filter, new 
QueueIterateAction(expectedHits) {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
             boolean ignored = false;
@@ -2814,11 +2812,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       final HashMap<String, Long> queues = new HashMap<>();
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
-         @Override
-         public Integer expectedHits() {
-            return expectedHits;
-         }
+      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new 
QueueIterateAction(expectedHits) {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -4472,8 +4466,14 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
     */
    abstract class QueueIterateAction {
 
-      public Integer expectedHits() {
-         return null;
+      protected Integer expectedHits;
+
+      QueueIterateAction(Integer expectedHits) {
+         this.expectedHits = expectedHits;
+      }
+
+      QueueIterateAction() {
+         this.expectedHits = null;
       }
 
       /**
@@ -4484,6 +4484,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
        * @throws Exception
        */
       public abstract boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception;
+
+      public boolean expectedHitsReached(int currentHits) {
+         return expectedHits != null && currentHits >= expectedHits.intValue();
+      }
    }
 
    /* For external use we need to use a synchronized version since the list is 
not thread safe */
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 217645cdc3..75c6b0b40c 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1692,6 +1692,37 @@ public abstract class ActiveMQTestBase extends 
ArtemisTestCase {
       }
    }
 
+   public List<String> sendMessageBatch(int batchSize,
+                                        ClientSession session,
+                                        SimpleString queueAddr) throws 
ActiveMQException {
+      List<String> messageIds = new ArrayList<>();
+      ClientProducer producer = session.createProducer(queueAddr);
+      for (int i = 0; i < batchSize; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.getBodyBuffer().writeBytes(new byte[1024]);
+         String id = UUID.randomUUID().toString();
+         message.putStringProperty("id", id);
+         message.putIntProperty("seq", i); // this is to make the print-data 
easier to debug
+         messageIds.add(id);
+         producer.send(message);
+      }
+      session.commit();
+
+      return messageIds;
+   }
+
+   public boolean waitForMessages(Queue queue, int count, long timeout) throws 
Exception {
+      long timeToWait = System.currentTimeMillis() + timeout;
+
+      while (System.currentTimeMillis() < timeToWait) {
+         if (queue.getMessageCount() >= count) {
+            return true;
+         }
+         Thread.sleep(100);
+      }
+      return false;
+   }
+
    protected final ClientMessage createMessage(ClientSession session,
                                                int counter,
                                                boolean durable) throws 
ActiveMQException {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 6289bc7719..1d8ba69cd6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -75,6 +75,7 @@ import 
org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
 import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -137,6 +138,63 @@ public class QueueControlTest extends ManagementTestBase {
       this.durable = durable;
    }
 
+   @TestTemplate
+   public void testMoveMessagesInPagingMode() throws Exception {
+      final int TOTAL_MESSAGES = 10000;
+      final String DLA = "DLA";
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, false);
+
+      SimpleString queueAddr = SimpleString.of("testQueue");
+      
session.createQueue(QueueConfiguration.of(queueAddr).setDurable(durable));
+      SimpleString dlq = SimpleString.of(DLA);
+      session.createQueue(QueueConfiguration.of(dlq));
+
+      // Set up paging on the queue address
+      AddressSettings addressSettings = new 
AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(16 * 
1024).setDeadLetterAddress(dlq);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+
+      sendMessageBatch(TOTAL_MESSAGES, session, queueAddr);
+
+      Queue queue = server.locateQueue(queueAddr);
+
+      // Give time Queue.deliverAsync to deliver messages
+      assertTrue(waitForMessages(queue, TOTAL_MESSAGES, 5000));
+
+      PagingStore queuePagingStore = queue.getPagingStore();
+      assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+      //invoke moveMessages op
+      String queueControlResourceName = ResourceNames.QUEUE + "testQueue";
+      Object resource = 
server.getManagementService().getResource(queueControlResourceName);
+      QueueControl queueControl = (QueueControl) resource;
+      assertEquals(queueControl.getMessageCount(), 10000);
+
+      // move messages to DLQ
+      int count = queueControl.moveMessages(500, "", DLA, false, 500);
+      assertEquals(500, count);
+
+      //messages shouldn't move on to the same queue
+      try {
+         queueControl.moveMessages(1000, "", "testQueue", false, 9000);
+         fail("messages cannot be moved on to the queue itself");
+      } catch (IllegalArgumentException ok) {
+         //ok
+      }
+
+      // 9500 left
+      count = queueControl.moveMessages(1000, "", DLA, false, 9000);
+      assertEquals(9000, count);
+
+      // 500 left, try move 1000
+      count = queueControl.moveMessages(100, "", DLA, false, 1000);
+      assertEquals(500, count);
+
+      // zero left, try move again
+      count = queueControl.moveMessages(100, "", DLA, false, 1000);
+      assertEquals(0, count);
+   }
+
    @TestTemplate
    public void testGetPreparedTransactionMessageCount() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index 671e131c70..50a7aea1d6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -20,16 +20,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -292,25 +288,6 @@ public class PagingSendTest extends ActiveMQTestBase {
       }
    }
 
-   public List<String> sendMessageBatch(int batchSize,
-                                        ClientSession session,
-                                        SimpleString queueAddr) throws 
ActiveMQException {
-      List<String> messageIds = new ArrayList<>();
-      ClientProducer producer = session.createProducer(queueAddr);
-      for (int i = 0; i < batchSize; i++) {
-         ClientMessage message = session.createMessage(true);
-         message.getBodyBuffer().writeBytes(new byte[1024]);
-         String id = UUID.randomUUID().toString();
-         message.putStringProperty("id", id);
-         message.putIntProperty("seq", i); // this is to make the print-data 
easier to debug
-         messageIds.add(id);
-         producer.send(message);
-      }
-      session.commit();
-
-      return messageIds;
-   }
-
    /**
     * checks that there are no message duplicates in the page.  Any IDs found 
in the ignoreIds field will not be tested
     * this allows us to test only those messages that have been sent after the 
address has started paging (ignoring any
@@ -337,18 +314,6 @@ public class PagingSendTest extends ActiveMQTestBase {
       assertEquals(0, duplicates);
    }
 
-   public boolean waitForMessages(Queue queue, int count, long timeout) throws 
Exception {
-      long timeToWait = System.currentTimeMillis() + timeout;
-
-      while (System.currentTimeMillis() < timeToWait) {
-         if (queue.getMessageCount() >= count) {
-            return true;
-         }
-         Thread.sleep(100);
-      }
-      return false;
-   }
-
    /**
     * checks that there are no message duplicates in the page.  Any IDs found 
in the ignoreIds field will not be tested
     * this allows us to test only those messages that have been sent after the 
address has started paging (ignoring any


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to