This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4ce0dfca2d ARTEMIS-4959 moveMessages operation can move more messages
than max messageCount
4ce0dfca2d is described below
commit 4ce0dfca2d3dcd47a456d77df79ed63790950a9b
Author: Howard Gao <[email protected]>
AuthorDate: Mon Jul 29 21:09:48 2024 +0800
ARTEMIS-4959 moveMessages operation can move more messages than max
messageCount
---
.../core/management/impl/QueueControlImpl.java | 4 ++
.../artemis/core/server/impl/QueueImpl.java | 42 +++++++++-------
.../artemis/tests/util/ActiveMQTestBase.java | 31 ++++++++++++
.../integration/management/QueueControlTest.java | 58 ++++++++++++++++++++++
.../tests/integration/paging/PagingSendTest.java | 35 -------------
5 files changed, 116 insertions(+), 54 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index e3a054a436..86988c1c6c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1382,6 +1382,10 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
final int messageCount) throws Exception {
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
+ if (this.queue.getName().toString().equals(otherQueueName)) {
+ //doesn't make sense to move messages to itself
+ throw new IllegalArgumentException("Cannot move messages onto
itself");
+ }
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.moveMessages(queue, flushLimit, filterStr,
otherQueueName, rejectDuplicates, messageCount);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 249c7e44a9..720ebd1f06 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2280,7 +2280,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
QueueIterateAction messageAction) throws
Exception {
int count = 0;
int txCount = 0;
- Integer expectedHits = messageAction.expectedHits();
// This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor.
depagePending = true;
@@ -2296,7 +2295,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
try (LinkedListIterator<MessageReference> iter = iterator()) {
- while (iter.hasNext()) {
+ while (iter.hasNext() &&
!messageAction.expectedHitsReached(count)) {
MessageReference ref = iter.next();
if (filter1 == null || filter1.match(ref.getMessage())) {
@@ -2306,9 +2305,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
txCount++;
count++;
- if (expectedHits != null && count >=
expectedHits.intValue()) {
- break;
- }
}
}
@@ -2320,11 +2316,18 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
txCount = 0;
}
+ if (messageAction.expectedHitsReached(count)) {
+ return count;
+ }
+
List<MessageReference> cancelled =
scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true :
filter1.match(ref.getMessage()));
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference);
count++;
txCount++;
+ if (messageAction.expectedHitsReached(count)) {
+ break;
+ }
}
if (txCount > 0) {
@@ -2336,7 +2339,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
if (pageIterator != null) {
- while (pageIterator.hasNext()) {
+ while (pageIterator.hasNext() &&
!messageAction.expectedHitsReached(count)) {
PagedReference reference = pageIterator.next();
pageIterator.remove();
@@ -2750,12 +2753,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
final Integer expectedHits = messageCount > 0 ? messageCount : null;
final DuplicateIDCache targetDuplicateCache =
postOffice.getDuplicateIDCache(toAddress);
- return iterQueue(flushLimit, filter, new QueueIterateAction() {
- @Override
- public Integer expectedHits() {
- return expectedHits;
- }
-
+ return iterQueue(flushLimit, filter, new
QueueIterateAction(expectedHits) {
@Override
public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
boolean ignored = false;
@@ -2814,11 +2812,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
final HashMap<String, Long> queues = new HashMap<>();
- return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
- @Override
- public Integer expectedHits() {
- return expectedHits;
- }
+ return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new
QueueIterateAction(expectedHits) {
@Override
public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
@@ -4472,8 +4466,14 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
*/
abstract class QueueIterateAction {
- public Integer expectedHits() {
- return null;
+ protected Integer expectedHits;
+
+ QueueIterateAction(Integer expectedHits) {
+ this.expectedHits = expectedHits;
+ }
+
+ QueueIterateAction() {
+ this.expectedHits = null;
}
/**
@@ -4484,6 +4484,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
* @throws Exception
*/
public abstract boolean actMessage(Transaction tx, MessageReference ref)
throws Exception;
+
+ public boolean expectedHitsReached(int currentHits) {
+ return expectedHits != null && currentHits >= expectedHits.intValue();
+ }
}
/* For external use we need to use a synchronized version since the list is
not thread safe */
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 217645cdc3..75c6b0b40c 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1692,6 +1692,37 @@ public abstract class ActiveMQTestBase extends
ArtemisTestCase {
}
}
+ public List<String> sendMessageBatch(int batchSize,
+ ClientSession session,
+ SimpleString queueAddr) throws
ActiveMQException {
+ List<String> messageIds = new ArrayList<>();
+ ClientProducer producer = session.createProducer(queueAddr);
+ for (int i = 0; i < batchSize; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+ String id = UUID.randomUUID().toString();
+ message.putStringProperty("id", id);
+ message.putIntProperty("seq", i); // this is to make the print-data
easier to debug
+ messageIds.add(id);
+ producer.send(message);
+ }
+ session.commit();
+
+ return messageIds;
+ }
+
+ public boolean waitForMessages(Queue queue, int count, long timeout) throws
Exception {
+ long timeToWait = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < timeToWait) {
+ if (queue.getMessageCount() >= count) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+
protected final ClientMessage createMessage(ClientSession session,
int counter,
boolean durable) throws
ActiveMQException {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 6289bc7719..1d8ba69cd6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -75,6 +75,7 @@ import
org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -137,6 +138,63 @@ public class QueueControlTest extends ManagementTestBase {
this.durable = durable;
}
+ @TestTemplate
+ public void testMoveMessagesInPagingMode() throws Exception {
+ final int TOTAL_MESSAGES = 10000;
+ final String DLA = "DLA";
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of("testQueue");
+
session.createQueue(QueueConfiguration.of(queueAddr).setDurable(durable));
+ SimpleString dlq = SimpleString.of(DLA);
+ session.createQueue(QueueConfiguration.of(dlq));
+
+ // Set up paging on the queue address
+ AddressSettings addressSettings = new
AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(16 *
1024).setDeadLetterAddress(dlq);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+
+ sendMessageBatch(TOTAL_MESSAGES, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, TOTAL_MESSAGES, 5000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ //invoke moveMessages op
+ String queueControlResourceName = ResourceNames.QUEUE + "testQueue";
+ Object resource =
server.getManagementService().getResource(queueControlResourceName);
+ QueueControl queueControl = (QueueControl) resource;
+ assertEquals(queueControl.getMessageCount(), 10000);
+
+ // move messages to DLQ
+ int count = queueControl.moveMessages(500, "", DLA, false, 500);
+ assertEquals(500, count);
+
+ //messages shouldn't move on to the same queue
+ try {
+ queueControl.moveMessages(1000, "", "testQueue", false, 9000);
+ fail("messages cannot be moved on to the queue itself");
+ } catch (IllegalArgumentException ok) {
+ //ok
+ }
+
+ // 9500 left
+ count = queueControl.moveMessages(1000, "", DLA, false, 9000);
+ assertEquals(9000, count);
+
+ // 500 left, try move 1000
+ count = queueControl.moveMessages(100, "", DLA, false, 1000);
+ assertEquals(500, count);
+
+ // zero left, try move again
+ count = queueControl.moveMessages(100, "", DLA, false, 1000);
+ assertEquals(0, count);
+ }
+
@TestTemplate
public void testGetPreparedTransactionMessageCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index 671e131c70..50a7aea1d6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -20,16 +20,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -292,25 +288,6 @@ public class PagingSendTest extends ActiveMQTestBase {
}
}
- public List<String> sendMessageBatch(int batchSize,
- ClientSession session,
- SimpleString queueAddr) throws
ActiveMQException {
- List<String> messageIds = new ArrayList<>();
- ClientProducer producer = session.createProducer(queueAddr);
- for (int i = 0; i < batchSize; i++) {
- ClientMessage message = session.createMessage(true);
- message.getBodyBuffer().writeBytes(new byte[1024]);
- String id = UUID.randomUUID().toString();
- message.putStringProperty("id", id);
- message.putIntProperty("seq", i); // this is to make the print-data
easier to debug
- messageIds.add(id);
- producer.send(message);
- }
- session.commit();
-
- return messageIds;
- }
-
/**
* checks that there are no message duplicates in the page. Any IDs found
in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the
address has started paging (ignoring any
@@ -337,18 +314,6 @@ public class PagingSendTest extends ActiveMQTestBase {
assertEquals(0, duplicates);
}
- public boolean waitForMessages(Queue queue, int count, long timeout) throws
Exception {
- long timeToWait = System.currentTimeMillis() + timeout;
-
- while (System.currentTimeMillis() < timeToWait) {
- if (queue.getMessageCount() >= count) {
- return true;
- }
- Thread.sleep(100);
- }
- return false;
- }
-
/**
* checks that there are no message duplicates in the page. Any IDs found
in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the
address has started paging (ignoring any
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact