This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 9946d8e ARTEMIS-2538 Removing all messages from a huge queue causes
OOM
new 6c33a48 This closes #2894
9946d8e is described below
commit 9946d8e63ce46608c3961e59ede076f1d6dfa7e0
Author: brusdev <[email protected]>
AuthorDate: Fri Nov 15 18:17:45 2019 +0100
ARTEMIS-2538 Removing all messages from a huge queue causes OOM
The PageSubscriptionImpl.cleanupEntries could be locked by the queue
depage because they are executed with the same executor and the depage
could be locked by the iterQueue.
If PageSubscriptionImpl.cleanupEntries is locked, no one clean up the
JournalRecord and PagePositionImpl instances created during iterQueue.
So removing all messages from a huge queue, causes the retention of too
JournalRecord and PagePositionImpl instances until an OOM.
To avoid to lock the PageSubscriptionImpl.cleanupEntries the depage is
executed only if the queue isn't iterating.
---
.../artemis/core/server/impl/QueueImpl.java | 85 ++++++++++++----------
1 file changed, 45 insertions(+), 40 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9d88e3a..f06e053 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2969,59 +2969,67 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private void depage(final boolean scheduleExpiry) {
depagePending = false;
- synchronized (this) {
- if (isPaused() || pageIterator == null) {
- return;
- }
+ if (!depageLock.tryLock()) {
+ return;
}
- int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
-
- long timeout = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
-
- if (logger.isTraceEnabled()) {
- logger.trace("QueueMemorySize before depage on queue=" +
this.getName() + " is " + queueMemorySize.get());
- }
+ try {
+ synchronized (this) {
+ if (isPaused() || pageIterator == null) {
+ return;
+ }
+ }
- this.directDeliver = false;
+ int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
- int depaged = 0;
- while (timeout - System.nanoTime() > 0 && needsDepage()) {
- int status = pageIterator.tryNext();
- if (status == 2) {
- continue;
- } else if (status == 0) {
- break;
- }
+ long timeout = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
- depaged++;
- PagedReference reference = pageIterator.next();
if (logger.isTraceEnabled()) {
- logger.trace("Depaging reference " + reference + " on queue " +
this.getName());
+ logger.trace("QueueMemorySize before depage on queue=" +
this.getName() + " is " + queueMemorySize.get());
}
- addTail(reference, false);
- pageIterator.remove();
- //We have to increment this here instead of in the iterator so we
have access to the reference from next()
- pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
- }
+ this.directDeliver = false;
- if (logger.isDebugEnabled()) {
- if (depaged == 0 && queueMemorySize.get() >= maxSize) {
- logger.debug("Couldn't depage any message as the maxSize on the
queue was achieved. " + "There are too many pending messages to be acked in
reference to the page configuration");
+ int depaged = 0;
+ while (timeout - System.nanoTime() > 0 && needsDepage()) {
+ int status = pageIterator.tryNext();
+ if (status == 2) {
+ continue;
+ } else if (status == 0) {
+ break;
+ }
+
+ depaged++;
+ PagedReference reference = pageIterator.next();
+ if (logger.isTraceEnabled()) {
+ logger.trace("Depaging reference " + reference + " on queue " +
this.getName());
+ }
+ addTail(reference, false);
+ pageIterator.remove();
+
+ //We have to increment this here instead of in the iterator so we
have access to the reference from next()
+
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
}
if (logger.isDebugEnabled()) {
- logger.debug("Queue Memory Size after depage on queue=" +
this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize
+ ". Depaged " + depaged + " messages, pendingDelivery=" +
messageReferences.size() + ", intermediateMessageReferences= " +
intermediateMessageReferences.size() + ", queueDelivering=" +
deliveringMetrics.getMessageCount());
+ if (depaged == 0 && queueMemorySize.get() >= maxSize) {
+ logger.debug("Couldn't depage any message as the maxSize on the
queue was achieved. " + "There are too many pending messages to be acked in
reference to the page configuration");
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Queue Memory Size after depage on queue=" +
this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize
+ ". Depaged " + depaged + " messages, pendingDelivery=" +
messageReferences.size() + ", intermediateMessageReferences= " +
intermediateMessageReferences.size() + ", queueDelivering=" +
deliveringMetrics.getMessageCount());
+
+ }
}
- }
- deliverAsync(true);
+ deliverAsync(true);
- if (depaged > 0 && scheduleExpiry) {
- // This will just call an executor
- expireReferences();
+ if (depaged > 0 && scheduleExpiry) {
+ // This will just call an executor
+ expireReferences();
+ }
+ } finally {
+ depageLock.unlock();
}
}
@@ -3888,13 +3896,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public void run() {
- depageLock.lock();
try {
depage(scheduleExpiry);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDelivering(e);
- } finally {
- depageLock.unlock();
}
}
}