This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9946d8e  ARTEMIS-2538 Removing all messages from a huge queue causes 
OOM
     new 6c33a48  This closes #2894
9946d8e is described below

commit 9946d8e63ce46608c3961e59ede076f1d6dfa7e0
Author: brusdev <[email protected]>
AuthorDate: Fri Nov 15 18:17:45 2019 +0100

    ARTEMIS-2538 Removing all messages from a huge queue causes OOM
    
    The PageSubscriptionImpl.cleanupEntries could be locked by the queue
    depage because they are executed with the same executor and the depage
    could be locked by the iterQueue.
    If PageSubscriptionImpl.cleanupEntries is locked, no one clean up the
    JournalRecord and PagePositionImpl instances created during iterQueue.
    So removing all messages from a huge queue, causes the retention of too
    JournalRecord and PagePositionImpl instances until an OOM.
    To avoid to lock the PageSubscriptionImpl.cleanupEntries the depage is
    executed only if the queue isn't iterating.
---
 .../artemis/core/server/impl/QueueImpl.java        | 85 ++++++++++++----------
 1 file changed, 45 insertions(+), 40 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9d88e3a..f06e053 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2969,59 +2969,67 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    private void depage(final boolean scheduleExpiry) {
       depagePending = false;
 
-      synchronized (this) {
-         if (isPaused() || pageIterator == null) {
-            return;
-         }
+      if (!depageLock.tryLock()) {
+         return;
       }
 
-      int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
-
-      long timeout = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
-
-      if (logger.isTraceEnabled()) {
-         logger.trace("QueueMemorySize before depage on queue=" + 
this.getName() + " is " + queueMemorySize.get());
-      }
+      try {
+         synchronized (this) {
+            if (isPaused() || pageIterator == null) {
+               return;
+            }
+         }
 
-      this.directDeliver = false;
+         int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
 
-      int depaged = 0;
-      while (timeout - System.nanoTime() > 0 && needsDepage()) {
-         int status = pageIterator.tryNext();
-         if (status == 2) {
-            continue;
-         } else if (status == 0) {
-            break;
-         }
+         long timeout = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
 
-         depaged++;
-         PagedReference reference = pageIterator.next();
          if (logger.isTraceEnabled()) {
-            logger.trace("Depaging reference " + reference + " on queue " + 
this.getName());
+            logger.trace("QueueMemorySize before depage on queue=" + 
this.getName() + " is " + queueMemorySize.get());
          }
-         addTail(reference, false);
-         pageIterator.remove();
 
-         //We have to increment this here instead of in the iterator so we 
have access to the reference from next()
-         pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
-      }
+         this.directDeliver = false;
 
-      if (logger.isDebugEnabled()) {
-         if (depaged == 0 && queueMemorySize.get() >= maxSize) {
-            logger.debug("Couldn't depage any message as the maxSize on the 
queue was achieved. " + "There are too many pending messages to be acked in 
reference to the page configuration");
+         int depaged = 0;
+         while (timeout - System.nanoTime() > 0 && needsDepage()) {
+            int status = pageIterator.tryNext();
+            if (status == 2) {
+               continue;
+            } else if (status == 0) {
+               break;
+            }
+
+            depaged++;
+            PagedReference reference = pageIterator.next();
+            if (logger.isTraceEnabled()) {
+               logger.trace("Depaging reference " + reference + " on queue " + 
this.getName());
+            }
+            addTail(reference, false);
+            pageIterator.remove();
+
+            //We have to increment this here instead of in the iterator so we 
have access to the reference from next()
+            
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
          }
 
          if (logger.isDebugEnabled()) {
-            logger.debug("Queue Memory Size after depage on queue=" + 
this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize 
+ ". Depaged " + depaged + " messages, pendingDelivery=" + 
messageReferences.size() + ", intermediateMessageReferences= " + 
intermediateMessageReferences.size() + ", queueDelivering=" + 
deliveringMetrics.getMessageCount());
+            if (depaged == 0 && queueMemorySize.get() >= maxSize) {
+               logger.debug("Couldn't depage any message as the maxSize on the 
queue was achieved. " + "There are too many pending messages to be acked in 
reference to the page configuration");
+            }
 
+            if (logger.isDebugEnabled()) {
+               logger.debug("Queue Memory Size after depage on queue=" + 
this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize 
+ ". Depaged " + depaged + " messages, pendingDelivery=" + 
messageReferences.size() + ", intermediateMessageReferences= " + 
intermediateMessageReferences.size() + ", queueDelivering=" + 
deliveringMetrics.getMessageCount());
+
+            }
          }
-      }
 
-      deliverAsync(true);
+         deliverAsync(true);
 
-      if (depaged > 0 && scheduleExpiry) {
-         // This will just call an executor
-         expireReferences();
+         if (depaged > 0 && scheduleExpiry) {
+            // This will just call an executor
+            expireReferences();
+         }
+      } finally {
+         depageLock.unlock();
       }
    }
 
@@ -3888,13 +3896,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       @Override
       public void run() {
-         depageLock.lock();
          try {
             depage(scheduleExpiry);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.errorDelivering(e);
-         } finally {
-            depageLock.unlock();
          }
       }
    }

Reply via email to