This is an automated email from the ASF dual-hosted git repository. mattrpav pushed a commit to branch activemq-5.16.x in repository https://gitbox.apache.org/repos/asf/activemq.git
commit 6a25d654f22c034bbb4e5d23f931507e911d0308 Author: Matt Pavlovich <[email protected]> AuthorDate: Mon Dec 20 08:38:32 2021 -0600 [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724) - Default 'true' to match existing behavior - Added counter to DestinationView --- .../apache/activemq/broker/jmx/DestinationView.java | 9 +++++++++ .../activemq/broker/jmx/DestinationViewMBean.java | 20 ++++++++++++++++++++ .../activemq/broker/region/BaseDestination.java | 14 +++++++++++++- .../apache/activemq/broker/region/Destination.java | 4 ++++ .../activemq/broker/region/DestinationFilter.java | 10 ++++++++++ .../broker/region/DestinationStatistics.java | 11 +++++++++++ .../activemq/broker/region/policy/PolicyEntry.java | 17 ++++++++++++++++- 7 files changed, 83 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index d795c4d..db0c67f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -103,6 +103,11 @@ public class DestinationView implements DestinationViewMBean { } @Override + public long getDuplicateFromStoreCount() { + return destination.getDestinationStatistics().getDuplicateFromStore().getCount(); + } + + @Override public long getInFlightCount() { return destination.getDestinationStatistics().getInflight().getCount(); } @@ -570,4 +575,8 @@ public class DestinationView implements DestinationViewMBean { return destination.getDestinationStatistics().getBlockedTime().getTotalTime(); } + @Override + public boolean isSendDuplicateFromStoreToDLQ() { + return destination.isSendDuplicateFromStoreToDLQ(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index ad0ae32..526e648 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -70,6 +70,26 @@ public interface DestinationViewMBean { long getDequeueCount(); /** + * Returns the number of duplicate messages that have been paged-in + * from the store. + * + * @return The number of duplicate messages that have been paged-in + * from the store. + */ + @MBeanInfo("Number of duplicate messages that have been paged-in from the store.") + long getDuplicateFromStoreCount(); + + /** + * Returns the config setting to send a duplicate message from store + * to the dead letter queue. + * + * @return The config setting to send a duplicate message from store + * to the dead letter queue. + */ + @MBeanInfo("Config setting to send a duplicate from store message to the dead letter queue.") + boolean isSendDuplicateFromStoreToDLQ(); + + /** * Returns the number of messages that have been acknowledged by network subscriptions from the * destination. * diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 443a8f2..55b3cba 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -88,6 +88,7 @@ public abstract class BaseDestination implements Destination { private boolean advisoryForDelivery; private boolean advisoryForConsumed; private boolean sendAdvisoryIfNoConsumers; + private boolean sendDuplicateFromStoreToDLQ = true; private boolean includeBodyForAdvisory; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final BrokerService brokerService; @@ -477,6 +478,14 @@ public abstract class BaseDestination implements Destination { this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } + public boolean isSendDuplicateFromStoreToDLQ() { + return this.sendDuplicateFromStoreToDLQ; + } + + public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) { + this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ; + } + public boolean isIncludeBodyForAdvisory() { return includeBodyForAdvisory; } @@ -889,11 +898,14 @@ public abstract class BaseDestination implements Destination { @Override public void duplicateFromStore(Message message, Subscription subscription) { + destinationStatistics.getDuplicateFromStore().increment(); ConnectionContext connectionContext = createConnectionContext(); getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId()); Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination); message.setRegionDestination(this); - broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); + if(this.isSendDuplicateFromStoreToDLQ()) { + broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); + } MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1); messageAck.setPoisonCause(cause); try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 031015e..180cf25 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -245,4 +245,8 @@ public interface Destination extends Service, Task, Message.MessageDestination { public void clearPendingMessages(int pendingAdditionsCount); void duplicateFromStore(Message message, Subscription subscription); + + boolean isSendDuplicateFromStoreToDLQ(); + + void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 0d0db05..2c11bc3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -394,6 +394,16 @@ public class DestinationFilter implements Destination { next.duplicateFromStore(message, subscription); } + @Override + public boolean isSendDuplicateFromStoreToDLQ() { + return next.isSendDuplicateFromStoreToDLQ(); + } + + @Override + public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) { + next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 0a9176e..88dd911 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -37,6 +37,7 @@ public class DestinationStatistics extends StatsImpl { protected CountStatisticImpl messages; protected PollCountStatisticImpl messagesCached; protected CountStatisticImpl dispatched; + protected CountStatisticImpl duplicateFromStore; protected CountStatisticImpl inflight; protected CountStatisticImpl expired; protected TimeStatisticImpl processTime; @@ -50,6 +51,7 @@ public class DestinationStatistics extends StatsImpl { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination"); dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination"); + duplicateFromStore = new CountStatisticImpl("duplicateFromStore", "The number of duplicate messages that have been paged-in from the store for this destination"); forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination"); inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement"); expired = new CountStatisticImpl("expired", "The number of messages that have expired"); @@ -68,6 +70,7 @@ public class DestinationStatistics extends StatsImpl { addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); + addStatistic("duplicateFromStore", duplicateFromStore); addStatistic("inflight", inflight); addStatistic("expired", expired); addStatistic("consumers", consumers); @@ -124,6 +127,10 @@ public class DestinationStatistics extends StatsImpl { return dispatched; } + public CountStatisticImpl getDuplicateFromStore() { + return duplicateFromStore; + } + public TimeStatisticImpl getProcessTime() { return this.processTime; } @@ -145,6 +152,7 @@ public class DestinationStatistics extends StatsImpl { dequeues.reset(); forwards.reset(); dispatched.reset(); + duplicateFromStore.reset(); inflight.reset(); expired.reset(); blockedSends.reset(); @@ -158,6 +166,7 @@ public class DestinationStatistics extends StatsImpl { enqueues.setEnabled(enabled); dispatched.setEnabled(enabled); dequeues.setEnabled(enabled); + duplicateFromStore.setEnabled(enabled); forwards.setEnabled(enabled); inflight.setEnabled(enabled); expired.setEnabled(true); @@ -177,6 +186,7 @@ public class DestinationStatistics extends StatsImpl { enqueues.setParent(parent.enqueues); dispatched.setParent(parent.dispatched); dequeues.setParent(parent.dequeues); + duplicateFromStore.setParent(parent.duplicateFromStore); forwards.setParent(parent.forwards); inflight.setParent(parent.inflight); expired.setParent(parent.expired); @@ -192,6 +202,7 @@ public class DestinationStatistics extends StatsImpl { enqueues.setParent(null); dispatched.setParent(null); dequeues.setParent(null); + duplicateFromStore.setParent(null); forwards.setParent(null); inflight.setParent(null); expired.setParent(null); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index ab869b0..862d5dd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -51,6 +51,7 @@ public class PolicyEntry extends DestinationMapEntry { private DispatchPolicy dispatchPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private boolean sendAdvisoryIfNoConsumers; + private boolean sendDuplicateFromStoreToDLQ = true; private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY; private PendingMessageLimitStrategy pendingMessageLimitStrategy; private MessageEvictionStrategy messageEvictionStrategy; @@ -241,7 +242,6 @@ public class PolicyEntry extends DestinationMapEntry { if (isUpdate("maxBrowsePageSize", includedProperties)) { destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); } - if (isUpdate("minimumMessageSize", includedProperties)) { destination.setMinimumMessageSize((int) getMinimumMessageSize()); } @@ -296,6 +296,9 @@ public class PolicyEntry extends DestinationMapEntry { if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) { destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); } + if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) { + destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ()); + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -456,6 +459,18 @@ public class PolicyEntry extends DestinationMapEntry { this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } + public boolean isSendDuplicateFromStoreToDLQ() { + return sendDuplicateFromStoreToDLQ; + } + + /** + * Sends a copy of message to DLQ if a duplicate messages are paged-in from + * the messages store + */ + public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) { + this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ; + } + public DeadLetterStrategy getDeadLetterStrategy() { return deadLetterStrategy; }
