This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new cb355bb  ARTEMIS-2515 pageIterator.hasNext spends too much time in the 
case of no messages matched
     new 714e31b  This closes #2861
cb355bb is described below

commit cb355bb5847789449988dc4343532628389bc30e
Author: Wei Yang <wy96...@gmail.com>
AuthorDate: Mon Sep 23 19:50:40 2019 +0800

    ARTEMIS-2515 pageIterator.hasNext spends too much time in the case of no 
messages matched
---
 .../artemis/core/paging/cursor/PageIterator.java   |  3 ++
 .../core/paging/cursor/PageSubscription.java       |  2 +-
 .../paging/cursor/impl/PageSubscriptionImpl.java   | 45 ++++++++++++++++------
 .../artemis/core/server/impl/QueueImpl.java        | 41 ++++++++++++--------
 4 files changed, 64 insertions(+), 27 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java
index 1732133..ce7ddb7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java
@@ -22,4 +22,7 @@ import 
org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 public interface PageIterator extends LinkedListIterator<PagedReference> {
 
    void redeliver(PagePosition reference);
+
+   // return 0 if no elements, 1 if having more elements, 2 if taking too long 
to find
+   int tryNext();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 33092d5..eb41e63 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -55,7 +55,7 @@ public interface PageSubscription {
     */
    boolean isPaging();
 
-   LinkedListIterator<PagedReference> iterator();
+   PageIterator iterator();
 
    LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 7fee465..423b588 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -47,6 +47,7 @@ import 
org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -61,10 +62,14 @@ import 
org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 import org.jboss.logging.Logger;
 
+import static 
org.apache.activemq.artemis.core.server.impl.QueueImpl.DELIVERY_TIMEOUT;
+
 public final class PageSubscriptionImpl implements PageSubscription {
 
    private static final Logger logger = 
Logger.getLogger(PageSubscriptionImpl.class);
 
+   private static final PagedReference dummyPagedRef = new 
PagedReferenceImpl(null, null, null);
+
    private boolean empty = true;
 
    // Number of scheduled cleanups, to avoid too many schedules
@@ -1323,7 +1328,13 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
             PagePositionAndFileOffset lastPosition = position;
             PagePositionAndFileOffset tmpPosition = position;
 
+            long timeout = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
+
             do {
+               if (System.nanoTime() - timeout > 0) {
+                  return dummyPagedRef;
+               }
+
                synchronized (redeliveries) {
                   PagePosition redelivery = redeliveries.poll();
 
@@ -1363,6 +1374,8 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
                PageCursorInfo info = 
getPageInfo(message.getPosition().getPageNr());
 
+               position = tmpPosition;
+
                if (!browsing && info != null && 
(info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
                   continue;
                }
@@ -1398,8 +1411,6 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
                   }
                }
 
-               position = tmpPosition;
-
                if (valid) {
                   match = match(message.getMessage());
 
@@ -1420,24 +1431,36 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
          }
       }
 
-      /**
-       * QueueImpl::deliver could be calling hasNext while QueueImpl.depage 
could be using next and hasNext as well.
-       * It would be a rare race condition but I would prefer avoiding that 
scenario
-       */
       @Override
-      public synchronized boolean hasNext() {
+      public synchronized int tryNext() {
          // if an unbehaved program called hasNext twice before next, we only 
cache it once.
          if (cachedNext != null) {
-            return true;
+            return 1;
          }
 
          if (!pageStore.isPaging()) {
-            return false;
+            return 0;
          }
 
-         cachedNext = next();
+         PagedReference pagedReference = next();
+         if (pagedReference == dummyPagedRef) {
+            return 2;
+         } else {
+            cachedNext = pagedReference;
+            return cachedNext == null ? 0 : 1;
+         }
+      }
 
-         return cachedNext != null;
+      /**
+       * QueueImpl::deliver could be calling hasNext while QueueImpl.depage 
could be using next and hasNext as well.
+       * It would be a rare race condition but I would prefer avoiding that 
scenario
+       */
+      @Override
+      public synchronized boolean hasNext() {
+         int status;
+         while ((status = tryNext()) == 2) {
+         }
+         return status == 0 ? false : true;
       }
 
       @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 881d398..f21e7ac 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -56,6 +56,7 @@ import 
org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.PriorityAware;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
@@ -173,7 +174,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private ReferenceCounter refCountForConsumers;
 
-   private final LinkedListIterator<PagedReference> pageIterator;
+   private final PageIterator pageIterator;
 
    private volatile boolean printErrorExpiring = false;
 
@@ -1123,9 +1124,13 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    @Override
    public void deliverAsync() {
+      deliverAsync(false);
+   }
+
+   private void deliverAsync(boolean noWait) {
       if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
          scheduledRunners.incrementAndGet();
-         checkDepage();
+         checkDepage(noWait);
          try {
             getExecutor().execute(deliverRunner);
          } catch (RejectedExecutionException ignored) {
@@ -1133,7 +1138,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             scheduledRunners.decrementAndGet();
          }
       }
-
    }
 
    @Override
@@ -2279,7 +2283,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
 
          // If empty we need to schedule depaging to make sure we would depage 
expired messages as well
-         if ((!hasElements || expired) && pageIterator != null && 
pageIterator.hasNext()) {
+         if ((!hasElements || expired) && pageIterator != null && 
pageIterator.tryNext() > 0) {
             scheduleDepage(true);
          }
       }
@@ -2656,7 +2660,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
          if (added++ > MAX_DELIVERIES_IN_LOOP) {
             // if we just keep polling from the intermediate we could starve 
in case there's a sustained load
-            deliverAsync();
+            deliverAsync(true);
             return;
          }
       }
@@ -2680,24 +2684,24 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       int handled = 0;
 
-      long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+      long timeout = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
       consumers.reset();
       while (true) {
          if (handled == MAX_DELIVERIES_IN_LOOP) {
             // Schedule another one - we do this to prevent a single thread 
getting caught up in this loop for too
             // long
 
-            deliverAsync();
+            deliverAsync(true);
 
             return false;
          }
 
-         if (System.currentTimeMillis() > timeout) {
+         if (System.nanoTime() - timeout > 0) {
             if (logger.isTraceEnabled()) {
                logger.trace("delivery has been running for too long. 
Scheduling another delivery task now");
             }
 
-            deliverAsync();
+            deliverAsync(true);
 
             return false;
          }
@@ -2841,8 +2845,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       refRemoved(ref);
    }
 
-   private void checkDepage() {
-      if (pageIterator != null && pageSubscription.isPaging() && 
!depagePending && needsDepage() && pageIterator.hasNext()) {
+   private void checkDepage(boolean noWait) {
+      if (pageIterator != null && pageSubscription.isPaging() && 
!depagePending && needsDepage() && (noWait ? pageIterator.tryNext() > 0 : 
pageIterator.hasNext())) {
          scheduleDepage(false);
       }
    }
@@ -2933,7 +2937,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
 
-      long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+      long timeout = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
 
       if (logger.isTraceEnabled()) {
          logger.trace("QueueMemorySize before depage on queue=" + 
this.getName() + " is " + queueMemorySize.get());
@@ -2942,7 +2946,14 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       this.directDeliver = false;
 
       int depaged = 0;
-      while (timeout > System.currentTimeMillis() && needsDepage() && 
pageIterator.hasNext()) {
+      while (timeout - System.nanoTime() > 0 && needsDepage()) {
+         int status = pageIterator.tryNext();
+         if (status == 2) {
+            continue;
+         } else if (status == 0) {
+            break;
+         }
+
          depaged++;
          PagedReference reference = pageIterator.next();
          if (logger.isTraceEnabled()) {
@@ -2966,7 +2977,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
       }
 
-      deliverAsync();
+      deliverAsync(true);
 
       if (depaged > 0 && scheduleExpiry) {
          // This will just call an executor
@@ -3815,7 +3826,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             if (needCheckDepage) {
                enterCritical(CRITICAL_CHECK_DEPAGE);
                try {
-                  checkDepage();
+                  checkDepage(true);
                } finally {
                   leaveCritical(CRITICAL_CHECK_DEPAGE);
                }

Reply via email to