This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 4ea9d25 ARTEMIS-2264 PurgeOnNoConsumers prevent removal of messages
with replication
new 3bfeaa4 This closes #2572
4ea9d25 is described below
commit 4ea9d25ca9461fdbae8d5550f081198ca215a199
Author: Francesco Nigro <[email protected]>
AuthorDate: Tue Feb 26 18:34:27 2019 +0100
ARTEMIS-2264 PurgeOnNoConsumers prevent removal of messages with replication
Added test reproducer and changed Queue::isDurableMessage usages into
Queue::isDurable to allow acks to hit the journal and being
correctly replicated across nodes.
---
.../artemis/core/postoffice/impl/PostOfficeImpl.java | 2 +-
.../apache/activemq/artemis/core/server/impl/QueueImpl.java | 12 ++++++------
.../artemis/core/server/impl/RoutingContextImpl.java | 2 +-
.../artemis/core/server/impl/ServerConsumerImpl.java | 2 +-
.../core/server/management/impl/ManagementServiceImpl.java | 2 +-
.../integration/cluster/failover/BackupSyncJournalTest.java | 12 ++++++++++++
6 files changed, 22 insertions(+), 10 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index ec7675e..671d4f0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1669,7 +1669,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
for (MessageReference ref : refs) {
Message message = ref.getMessage();
- if (message.isDurable() && ref.getQueue().isDurableMessage()) {
+ if (message.isDurable() && ref.getQueue().isDurable()) {
message.decrementDurableRefCount();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 79e53a9..57c33ad 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1462,7 +1462,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
} else {
Message message = ref.getMessage();
- boolean durableRef = message.isDurable() && isDurableMessage();
+ boolean durableRef = message.isDurable() && isDurable();
if (durableRef) {
storageManager.storeAcknowledge(id, message.getMessageID());
@@ -1500,7 +1500,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
} else {
Message message = ref.getMessage();
- boolean durableRef = message.isDurable() && isDurableMessage();
+ boolean durableRef = message.isDurable() && isDurable();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id,
message.getMessageID());
@@ -1528,7 +1528,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
public void reacknowledge(final Transaction tx, final MessageReference ref)
throws Exception {
Message message = ref.getMessage();
- if (message.isDurable() && isDurableMessage()) {
+ if (message.isDurable() && isDurable()) {
tx.setContainsPersistent();
}
@@ -2760,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return true;
}
- if (!internalQueue && reference.isDurable() && isDurableMessage() &&
!reference.isPaged()) {
+ if (!internalQueue && reference.isDurable() && isDurable() &&
!reference.isPaged()) {
storageManager.updateDeliveryCount(reference);
}
@@ -2789,7 +2789,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
- if (!reference.isPaged() && reference.isDurable() &&
isDurableMessage()) {
+ if (!reference.isPaged() && reference.isDurable() && isDurable()) {
storageManager.updateScheduledDeliveryTime(reference);
}
}
@@ -3252,7 +3252,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (message == null)
return;
- boolean durableRef = message.isDurable() && queue.isDurableMessage();
+ boolean durableRef = message.isDurable() && queue.isDurable();
try {
message.decrementRefCount();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index e9df830..2c92763 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -119,7 +119,7 @@ public final class RoutingContextImpl implements
RoutingContext {
RouteContextList listing = getContextListing(address);
- if (queue.isDurableMessage()) {
+ if (queue.isDurable()) {
listing.getDurableQueues().add(queue);
} else {
listing.getNonDurableQueues().add(queue);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 48e2f06..2dc834b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -457,7 +457,7 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCountAfterCancel would still be updated after
c
if (strictUpdateDeliveryCount && !ref.isPaged()) {
- if (ref.getMessage().isDurable() &&
ref.getQueue().isDurableMessage() &&
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&&
!ref.getQueue().isInternalQueue() &&
!ref.isPaged()) {
storageManager.updateDeliveryCount(ref);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 25354da..b0e3a84 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -248,7 +248,7 @@ public class ManagementServiceImpl implements
ManagementService {
QueueControlImpl queueControl = new QueueControlImpl(queue,
addressInfo.getName().toString(), messagingServer, storageManager,
securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
- MessageCounter counter = new
MessageCounter(queue.getName().toString(), null, queue, false,
queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
+ MessageCounter counter = new
MessageCounter(queue.getName().toString(), null, queue, false,
queue.isDurable(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
messageCounterManager.registerMessageCounter(queue.getName().toString(),
counter);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index 8342c62..f1d23be 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -35,6 +35,8 @@ import
org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -326,6 +328,16 @@ public class BackupSyncJournalTest extends
FailoverTestBase {
assertNoMoreMessages();
}
+ @Test
+ public void testRemoveAllMessageWithPurgeOnNoConsumers() throws Exception {
+ final boolean purgeOnNoConsumers = true;
+ createProducerSendSomeMessages();
+
liveServer.getServer().locateQueue(ADDRESS).setPurgeOnNoConsumers(purgeOnNoConsumers);
+ assertEquals(n_msgs, ((QueueControl)
liveServer.getServer().getManagementService().getResource(ResourceNames.QUEUE +
ADDRESS.toString())).removeAllMessages());
+ startBackupCrashLive();
+ assertNoMoreMessages();
+ }
+
private void startBackupCrashLive() throws Exception {
assertFalse("backup is started?", backupServer.isStarted());
liveServer.removeInterceptor(syncDelay);