Repository: activemq-artemis Updated Branches: refs/heads/1.x 9672dc23e -> 0c5962f1b
ARTEMIS-1328 Improving direct delivery check Based on #1447 as it is not possible to cherry-pick here Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/25c0f93a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/25c0f93a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/25c0f93a Branch: refs/heads/1.x Commit: 25c0f93ad50bc82f93cc2a6785203cb1ea366c40 Parents: 9672dc2 Author: Clebert Suconic <[email protected]> Authored: Mon Aug 7 23:48:29 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue Aug 8 13:00:37 2017 -0400 ---------------------------------------------------------------------- .../artemis/utils/OrderedExecutorFactory.java | 6 ++- .../artemis/core/server/impl/QueueImpl.java | 56 +++++++++++++------- 2 files changed, 42 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25c0f93a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java index f4c85f3..10aa7f6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java @@ -75,7 +75,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory { * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the * same method, will result in B's task running after A's. */ - private static class OrderedExecutor implements Executor { + public static class OrderedExecutor implements Executor { private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>(); private final Executor delegate; @@ -104,6 +104,10 @@ public final class OrderedExecutorFactory implements ExecutorFactory { } } + public boolean isFlushed() { + return stateUpdater.get(this) == STATE_NOT_RUNNING; + } + private final class ExecutorTask implements Runnable { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25c0f93a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 897fde3..8e338c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -85,6 +85,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.LinkedListIterator; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.PriorityLinkedList; import org.apache.activemq.artemis.utils.PriorityLinkedListImpl; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -107,7 +108,7 @@ public class QueueImpl implements Queue { public static final int MAX_DELIVERIES_IN_LOOP = 1000; - public static final int CHECK_QUEUE_SIZE_PERIOD = 100; + public static final int CHECK_QUEUE_SIZE_PERIOD = 1000; /** * If The system gets slow for any reason, this is the maximum time a Delivery or @@ -534,24 +535,27 @@ public class QueueImpl implements Queue { return; } - synchronized (directDeliveryGuard) { - // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the - // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty - // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (!directDeliver && - direct && - System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { - lastDirectDeliveryCheck = System.currentTimeMillis(); + if (!directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { + if (logger.isTraceEnabled()) { + logger.trace("Checking to re-enable direct deliver on queue " + this.getName()); + } + lastDirectDeliveryCheck = System.currentTimeMillis(); + synchronized (directDeliveryGuard) { + // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the + // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty + // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (intermediateMessageReferences.isEmpty() && - messageReferences.isEmpty() && - !pageIterator.hasNext() && - !pageSubscription.isPaging()) { + if (deliveriesInTransit.getCount() == 0 && isFlushed(getExecutor()) && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) { // We must block on the executor to ensure any async deliveries have completed or we might get out of order // deliveries - if (flushExecutor() && flushDeliveriesInTransit()) { - // Go into direct delivery mode - directDeliver = true; + // Go into direct delivery mode + directDeliver = true; + if (logger.isTraceEnabled()) { + logger.trace("Setting direct deliverer to true"); + } + } else { + if (logger.isTraceEnabled()) { + logger.trace("Couldn't set direct deliver back"); } } } @@ -671,9 +675,23 @@ public class QueueImpl implements Queue { flushExecutor(); } + private boolean isFlushed(Executor executor) { + if (executor instanceof OrderedExecutorFactory.OrderedExecutor) { + return ((OrderedExecutorFactory.OrderedExecutor)executor).isFlushed(); + } else { + CountDownLatch latch = new CountDownLatch(1); + executor.execute(latch::countDown); + try { + return latch.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } + } + @Override public boolean flushExecutor() { - boolean ok = internalFlushExecutor(10000); + boolean ok = internalFlushExecutor(10000, true); if (!ok) { ActiveMQServerLogger.LOGGER.errorFlushingExecutorsOnQueue(); @@ -682,14 +700,14 @@ public class QueueImpl implements Queue { return ok; } - private boolean internalFlushExecutor(long timeout) { + private boolean internalFlushExecutor(long timeout, boolean log) { FutureLatch future = new FutureLatch(); getExecutor().execute(future); boolean result = future.await(timeout); - if (!result) { + if (log && !result) { ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout); } return result;
