This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ea9d25  ARTEMIS-2264 PurgeOnNoConsumers prevent removal of messages 
with replication
     new 3bfeaa4  This closes #2572
4ea9d25 is described below

commit 4ea9d25ca9461fdbae8d5550f081198ca215a199
Author: Francesco Nigro <[email protected]>
AuthorDate: Tue Feb 26 18:34:27 2019 +0100

    ARTEMIS-2264 PurgeOnNoConsumers prevent removal of messages with replication
    
    Added test reproducer and changed Queue::isDurableMessage usages into
    Queue::isDurable to allow acks to hit the journal and being
    correctly replicated across nodes.
---
 .../artemis/core/postoffice/impl/PostOfficeImpl.java         |  2 +-
 .../apache/activemq/artemis/core/server/impl/QueueImpl.java  | 12 ++++++------
 .../artemis/core/server/impl/RoutingContextImpl.java         |  2 +-
 .../artemis/core/server/impl/ServerConsumerImpl.java         |  2 +-
 .../core/server/management/impl/ManagementServiceImpl.java   |  2 +-
 .../integration/cluster/failover/BackupSyncJournalTest.java  | 12 ++++++++++++
 6 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index ec7675e..671d4f0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1669,7 +1669,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          for (MessageReference ref : refs) {
             Message message = ref.getMessage();
 
-            if (message.isDurable() && ref.getQueue().isDurableMessage()) {
+            if (message.isDurable() && ref.getQueue().isDurable()) {
                message.decrementDurableRefCount();
             }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 79e53a9..57c33ad 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1462,7 +1462,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          } else {
             Message message = ref.getMessage();
 
-            boolean durableRef = message.isDurable() && isDurableMessage();
+            boolean durableRef = message.isDurable() && isDurable();
 
             if (durableRef) {
                storageManager.storeAcknowledge(id, message.getMessageID());
@@ -1500,7 +1500,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       } else {
          Message message = ref.getMessage();
 
-         boolean durableRef = message.isDurable() && isDurableMessage();
+         boolean durableRef = message.isDurable() && isDurable();
 
          if (durableRef) {
             storageManager.storeAcknowledgeTransactional(tx.getID(), id, 
message.getMessageID());
@@ -1528,7 +1528,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    public void reacknowledge(final Transaction tx, final MessageReference ref) 
throws Exception {
       Message message = ref.getMessage();
 
-      if (message.isDurable() && isDurableMessage()) {
+      if (message.isDurable() && isDurable()) {
          tx.setContainsPersistent();
       }
 
@@ -2760,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          return true;
       }
 
-      if (!internalQueue && reference.isDurable() && isDurableMessage() && 
!reference.isPaged()) {
+      if (!internalQueue && reference.isDurable() && isDurable() && 
!reference.isPaged()) {
          storageManager.updateDeliveryCount(reference);
       }
 
@@ -2789,7 +2789,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
             reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
 
-            if (!reference.isPaged() && reference.isDurable() && 
isDurableMessage()) {
+            if (!reference.isPaged() && reference.isDurable() && isDurable()) {
                storageManager.updateScheduledDeliveryTime(reference);
             }
          }
@@ -3252,7 +3252,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       if (message == null)
          return;
 
-      boolean durableRef = message.isDurable() && queue.isDurableMessage();
+      boolean durableRef = message.isDurable() && queue.isDurable();
 
       try {
          message.decrementRefCount();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index e9df830..2c92763 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -119,7 +119,7 @@ public final class RoutingContextImpl implements 
RoutingContext {
 
       RouteContextList listing = getContextListing(address);
 
-      if (queue.isDurableMessage()) {
+      if (queue.isDurable()) {
          listing.getDurableQueues().add(queue);
       } else {
          listing.getNonDurableQueues().add(queue);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 48e2f06..2dc834b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -457,7 +457,7 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
             // If updateDeliveries = false (set by strict-update),
             // the updateDeliveryCountAfterCancel would still be updated after 
c
             if (strictUpdateDeliveryCount && !ref.isPaged()) {
-               if (ref.getMessage().isDurable() && 
ref.getQueue().isDurableMessage() &&
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() 
&&
                   !ref.getQueue().isInternalQueue() &&
                   !ref.isPaged()) {
                   storageManager.updateDeliveryCount(ref);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 25354da..b0e3a84 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -248,7 +248,7 @@ public class ManagementServiceImpl implements 
ManagementService {
 
       QueueControlImpl queueControl = new QueueControlImpl(queue, 
addressInfo.getName().toString(), messagingServer, storageManager, 
securityStore, addressSettingsRepository);
       if (messageCounterManager != null) {
-         MessageCounter counter = new 
MessageCounter(queue.getName().toString(), null, queue, false, 
queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
+         MessageCounter counter = new 
MessageCounter(queue.getName().toString(), null, queue, false, 
queue.isDurable(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);
          
messageCounterManager.registerMessageCounter(queue.getName().toString(), 
counter);
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index 8342c62..f1d23be 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -35,6 +35,8 @@ import 
org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
 import org.apache.activemq.artemis.api.core.client.FailoverEventType;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -326,6 +328,16 @@ public class BackupSyncJournalTest extends 
FailoverTestBase {
       assertNoMoreMessages();
    }
 
+   @Test
+   public void testRemoveAllMessageWithPurgeOnNoConsumers() throws Exception {
+      final boolean purgeOnNoConsumers = true;
+      createProducerSendSomeMessages();
+      
liveServer.getServer().locateQueue(ADDRESS).setPurgeOnNoConsumers(purgeOnNoConsumers);
+      assertEquals(n_msgs, ((QueueControl) 
liveServer.getServer().getManagementService().getResource(ResourceNames.QUEUE + 
ADDRESS.toString())).removeAllMessages());
+      startBackupCrashLive();
+      assertNoMoreMessages();
+   }
+
    private void startBackupCrashLive() throws Exception {
       assertFalse("backup is started?", backupServer.isStarted());
       liveServer.removeInterceptor(syncDelay);

Reply via email to