Repository: activemq Updated Branches: refs/heads/master 2a815c2e0 -> 6cc2c1190
AMQ-6361 - fix contention over expiry processing with expiry task and client expiry ack, unit test regression sorted. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6cc2c119 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6cc2c119 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6cc2c119 Branch: refs/heads/master Commit: 6cc2c1190da15579a9a886bad03596e14a77a677 Parents: 2a815c2 Author: gtully <[email protected]> Authored: Fri Jul 22 15:18:31 2016 +0100 Committer: gtully <[email protected]> Committed: Fri Jul 22 15:18:31 2016 +0100 ---------------------------------------------------------------------- .../broker/region/IndirectMessageReference.java | 5 +++++ .../activemq/broker/region/NullMessageReference.java | 5 +++++ .../activemq/broker/region/PrefetchSubscription.java | 4 +++- .../apache/activemq/broker/region/RegionBroker.java | 14 +------------- .../java/org/apache/activemq/broker/region/Topic.java | 8 +++++--- .../region/cursors/FilePendingMessageCursor.java | 2 +- .../activemq/broker/region/MessageReference.java | 2 ++ .../java/org/apache/activemq/command/Message.java | 7 +++++++ .../broker/region/cursors/OrderPendingListTest.java | 5 +++++ .../region/cursors/PrioritizedPendingListTest.java | 5 +++++ .../apache/activemq/usecases/ExpiredMessagesTest.java | 1 - 11 files changed, 39 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index c1b5f3c..104dcb0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -204,4 +204,9 @@ public class IndirectMessageReference implements QueueMessageReference { public boolean isAdvisory() { return message.isAdvisory(); } + + @Override + public boolean canProcessAsExpired() { + return message.canProcessAsExpired(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index bef9b23..e8d26a8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -154,4 +154,9 @@ public final class NullMessageReference implements QueueMessageReference { return false; } + @Override + public boolean canProcessAsExpired() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index bc04566..5254440 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -323,7 +323,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } if (inAckRange) { Destination regionDestination = nodeDest; - regionDestination.messageExpired(context, this, node); + if (broker.isExpired(node)) { + regionDestination.messageExpired(context, this, node); + } iter.remove(); nodeDest.getDestinationStatistics().getInflight().decrement(); http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 961618d..69e0930 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -735,19 +735,7 @@ public class RegionBroker extends EmptyBroker { @Override public boolean isExpired(MessageReference messageReference) { - boolean expired = false; - if (messageReference.isExpired()) { - try { - // prevent duplicate expiry processing - Message message = messageReference.getMessage(); - synchronized (message) { - expired = stampAsExpired(message); - } - } catch (IOException e) { - LOG.warn("unexpected exception on message expiry determination for: {}", messageReference, e); - } - } - return expired; + return messageReference.canProcessAsExpired(); } private boolean stampAsExpired(Message message) throws IOException { http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index c43f55e..bf803c1 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -527,9 +527,11 @@ public class Topic extends BaseDestination implements Task { // It could take while before we receive the commit // operation.. by that time the message could have // expired.. - if (broker.isExpired(message)) { - getDestinationStatistics().getExpired().increment(); - broker.messageExpired(context, message, null); + if (message.isExpired()) { + if (broker.isExpired(message)) { + getDestinationStatistics().getExpired().increment(); + broker.messageExpired(context, message, null); + } message.decrementReferenceCount(); return; } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 5bd0cda..20b2bc5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -483,7 +483,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private void discardExpiredMessage(MessageReference reference) { LOG.debug("Discarding expired message {}", reference); - if (broker.isExpired(reference)) { + if (reference.isExpired() && broker.isExpired(reference)) { ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); context.setBroker(broker); ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java b/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java index 64c91fa..50d23ed 100755 --- a/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java +++ b/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java @@ -63,5 +63,7 @@ public interface MessageReference { * @return true if the message is an advisory */ boolean isAdvisory(); + + boolean canProcessAsExpired(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-client/src/main/java/org/apache/activemq/command/Message.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index a500768..83f3201 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.DeflaterOutputStream; import javax.jms.JMSException; @@ -94,6 +95,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess private transient ActiveMQConnection connection; transient MessageDestination regionDestination; transient MemoryUsage memoryUsage; + transient AtomicBoolean processAsExpired = new AtomicBoolean(false); private BrokerId[] brokerPath; private BrokerId[] cluster; @@ -837,4 +839,9 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess } return super.toString(overrideFields); } + + @Override + public boolean canProcessAsExpired() { + return processAsExpired.compareAndSet(false, true); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java index 827c39b..74c7686 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java @@ -498,5 +498,10 @@ public class OrderPendingListTest { public boolean isAdvisory() { return false; } + + @Override + public boolean canProcessAsExpired() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java index 8ecc951..f95fb97 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java @@ -349,5 +349,10 @@ public class PrioritizedPendingListTest { public boolean isAdvisory() { return false; } + + @Override + public boolean canProcessAsExpired() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 0205599..0187aad 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory; import javax.jms.*; -import java.io.File; import java.util.concurrent.atomic.AtomicLong; import static org.apache.activemq.TestSupport.getDestination;
