This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 89698b9 ARTEMIS-2524 Remove message from map in LVQ if it's deleted/moved/expired/changed new 6a14d42 This closes #2867 89698b9 is described below commit 89698b9dbcf60912dc33f591b8561c059075262d Author: Wei Yang <wy96...@gmail.com> AuthorDate: Mon Oct 21 17:37:32 2019 +0800 ARTEMIS-2524 Remove message from map in LVQ if it's deleted/moved/expired/changed --- .../artemis/core/server/impl/LastValueQueue.java | 8 ++- .../artemis/tests/integration/server/LVQTest.java | 57 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 44235b0..5f9c82a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -182,15 +182,13 @@ public class LastValueQueue extends QueueImpl { @Override protected void refRemoved(MessageReference ref) { - if (isNonDestructive()) { - removeIfCurrent(ref); - } + removeIfCurrent(ref); super.refRemoved(ref); } @Override public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { - if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) { + if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) { removeIfCurrent(ref); } super.acknowledge(ref, reason, consumer); @@ -201,7 +199,7 @@ public class LastValueQueue extends QueueImpl { MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception { - if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) { + if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) { removeIfCurrent(ref); } super.acknowledge(tx, ref, reason, consumer); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index f9df972..3273c16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -26,10 +26,13 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.LastValueQueue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -691,6 +694,60 @@ public class LVQTest extends ActiveMQTestBase { assertTrue(queue.getPersistentSize() > 10 * 1024); } + @Test + public void testDeleteReference() throws Exception { + ClientProducer producer = clientSession.createProducer(address); + ClientMessage m1 = createTextMessage(clientSession, "m1"); + SimpleString rh = new SimpleString("SMID1"); + m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + m1.setBodyInputStream(createFakeLargeStream(2 * 1024)); + + ClientMessage m2 = clientSession.createMessage(true); + m2.setBodyInputStream(createFakeLargeStream(10 * 1024)); + m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + + Queue queue = server.locateQueue(qName1); + producer.send(m1); + LinkedListIterator<MessageReference> browserIterator = queue.browserIterator(); + // Wait for message delivered to queue + Wait.assertTrue(() -> browserIterator.hasNext(), 10, 2); + long messageId = browserIterator.next().getMessage().getMessageID(); + browserIterator.close(); + + queue.deleteReference(messageId); + // Wait for delete tx's afterCommit called + Wait.assertEquals(0L, () -> queue.getDeliveringSize(), 10, 2); + assertEquals(queue.getPersistentSize(), 0); + assertTrue(((LastValueQueue)queue).getLastValueKeys().isEmpty()); + + producer.send(m2); + // Wait for message delivered to queue + Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024, 10, 2); + assertEquals(queue.getDeliveringSize(), 0); + } + + @Test + public void testChangeReferencePriority() throws Exception { + ClientProducer producer = clientSession.createProducer(address); + ClientMessage m1 = createTextMessage(clientSession, "m1"); + SimpleString rh = new SimpleString("SMID1"); + m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + + Queue queue = server.locateQueue(qName1); + producer.send(m1); + LinkedListIterator<MessageReference> browserIterator = queue.browserIterator(); + // Wait for message delivered to queue + Wait.assertTrue(() -> browserIterator.hasNext(), 10, 2); + long messageId = browserIterator.next().getMessage().getMessageID(); + browserIterator.close(); + long oldSize = queue.getPersistentSize(); + + assertTrue(queue.changeReferencePriority(messageId, (byte) 1)); + // Wait for message delivered to queue + Wait.assertEquals(oldSize, () -> queue.getPersistentSize(), 10, 2); + assertEquals(queue.getDeliveringSize(), 0); + } + @Override @Before public void setUp() throws Exception {