This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 89698b9  ARTEMIS-2524 Remove message from map in LVQ if it's 
deleted/moved/expired/changed
     new 6a14d42  This closes #2867
89698b9 is described below

commit 89698b9dbcf60912dc33f591b8561c059075262d
Author: Wei Yang <wy96...@gmail.com>
AuthorDate: Mon Oct 21 17:37:32 2019 +0800

    ARTEMIS-2524 Remove message from map in LVQ if it's 
deleted/moved/expired/changed
---
 .../artemis/core/server/impl/LastValueQueue.java   |  8 ++-
 .../artemis/tests/integration/server/LVQTest.java  | 57 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 5 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 44235b0..5f9c82a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -182,15 +182,13 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    protected void refRemoved(MessageReference ref) {
-      if (isNonDestructive()) {
-         removeIfCurrent(ref);
-      }
+      removeIfCurrent(ref);
       super.refRemoved(ref);
    }
 
    @Override
    public void acknowledge(final MessageReference ref, final AckReason reason, 
final ServerConsumer consumer) throws Exception {
-      if (isNonDestructive() && reason == AckReason.EXPIRED || reason == 
AckReason.KILLED) {
+      if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
          removeIfCurrent(ref);
       }
       super.acknowledge(ref, reason, consumer);
@@ -201,7 +199,7 @@ public class LastValueQueue extends QueueImpl {
                            MessageReference ref,
                            AckReason reason,
                            ServerConsumer consumer) throws Exception {
-      if (isNonDestructive() && reason == AckReason.EXPIRED || reason == 
AckReason.KILLED) {
+      if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
          removeIfCurrent(ref);
       }
       super.acknowledge(tx, ref, reason, consumer);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index f9df972..3273c16 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -26,10 +26,13 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -691,6 +694,60 @@ public class LVQTest extends ActiveMQTestBase {
       assertTrue(queue.getPersistentSize() > 10 * 1024);
    }
 
+   @Test
+   public void testDeleteReference() throws Exception {
+      ClientProducer producer = clientSession.createProducer(address);
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      m1.setBodyInputStream(createFakeLargeStream(2 * 1024));
+
+      ClientMessage m2 = clientSession.createMessage(true);
+      m2.setBodyInputStream(createFakeLargeStream(10 * 1024));
+      m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+
+      Queue queue = server.locateQueue(qName1);
+      producer.send(m1);
+      LinkedListIterator<MessageReference> browserIterator = 
queue.browserIterator();
+      // Wait for message delivered to queue
+      Wait.assertTrue(() -> browserIterator.hasNext(), 10, 2);
+      long messageId = browserIterator.next().getMessage().getMessageID();
+      browserIterator.close();
+
+      queue.deleteReference(messageId);
+      // Wait for delete tx's afterCommit called
+      Wait.assertEquals(0L, () -> queue.getDeliveringSize(), 10, 2);
+      assertEquals(queue.getPersistentSize(), 0);
+      assertTrue(((LastValueQueue)queue).getLastValueKeys().isEmpty());
+
+      producer.send(m2);
+      // Wait for message delivered to queue
+      Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024, 10, 2);
+      assertEquals(queue.getDeliveringSize(), 0);
+   }
+
+   @Test
+   public void testChangeReferencePriority() throws Exception {
+      ClientProducer producer = clientSession.createProducer(address);
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+
+      Queue queue = server.locateQueue(qName1);
+      producer.send(m1);
+      LinkedListIterator<MessageReference> browserIterator = 
queue.browserIterator();
+      // Wait for message delivered to queue
+      Wait.assertTrue(() -> browserIterator.hasNext(), 10, 2);
+      long messageId = browserIterator.next().getMessage().getMessageID();
+      browserIterator.close();
+      long oldSize = queue.getPersistentSize();
+
+      assertTrue(queue.changeReferencePriority(messageId, (byte) 1));
+      // Wait for message delivered to queue
+      Wait.assertEquals(oldSize, () -> queue.getPersistentSize(), 10, 2);
+      assertEquals(queue.getDeliveringSize(), 0);
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {

Reply via email to