http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 2c4db3e..1c4038b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -722,7 +722,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled refs.remove(message.getMessageID()); // The delivering count should also be decreased as to avoid inconsistencies - ((QueueImpl) ref.getQueue()).decDelivering(); + ((QueueImpl) ref.getQueue()).decDelivering(ref); } connectionFailed(e, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 90b8814..2620cf9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -116,7 +117,7 @@ public class LastValueQueue extends QueueImpl { } else { // We keep the current ref and ack the one we are returning - super.referenceHandled(); + super.referenceHandled(ref); try { super.acknowledge(ref); @@ -139,7 +140,7 @@ public class LastValueQueue extends QueueImpl { private void replaceLVQMessage(MessageReference ref, HolderReference hr) { MessageReference oldRef = hr.getReference(); - referenceHandled(); + referenceHandled(ref); try { oldRef.acknowledge(); @@ -323,6 +324,11 @@ public class LastValueQueue extends QueueImpl { public Long getConsumerId() { return this.consumerId; } + + @Override + public long getPersistentSize() throws ActiveMQException { + return ref.getPersistentSize(); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 7543ba5..2802740 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -158,7 +159,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm @Override public void handled() { - queue.referenceHandled(); + queue.referenceHandled(this); } @Override @@ -239,4 +240,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm public int hashCode() { return this.getMessage().hashCode(); } + + @Override + public long getPersistentSize() throws ActiveMQException { + return this.getMessage().getPersistentSize(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index cfd06d9..89ea2fc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -364,16 +365,25 @@ public class PostOfficeJournalLoader implements JournalLoader { List<PagedMessage> pgMessages = pg.read(storageManager); Map<Long, AtomicInteger> countsPerQueueOnPage = new HashMap<>(); + Map<Long, AtomicLong> sizePerQueueOnPage = new HashMap<>(); for (PagedMessage pgd : pgMessages) { if (pgd.getTransactionID() <= 0) { for (long q : pgd.getQueueIDs()) { AtomicInteger countQ = countsPerQueueOnPage.get(q); + AtomicLong sizeQ = sizePerQueueOnPage.get(q); if (countQ == null) { countQ = new AtomicInteger(0); countsPerQueueOnPage.put(q, countQ); } + if (sizeQ == null) { + sizeQ = new AtomicLong(0); + sizePerQueueOnPage.put(q, sizeQ); + } countQ.incrementAndGet(); + if (pgd.getPersistentSize() > 0) { + sizeQ.addAndGet(pgd.getPersistentSize()); + } } } } @@ -387,12 +397,13 @@ public class PostOfficeJournalLoader implements JournalLoader { PageSubscriptionCounter counter = store.getCursorProvider().getSubscription(entry.getKey()).getCounter(); AtomicInteger value = countsPerQueueOnPage.get(entry.getKey()); + AtomicLong sizeValue = sizePerQueueOnPage.get(entry.getKey()); if (value == null) { logger.debug("Page " + entry.getKey() + " wasn't open, so we will just ignore"); } else { logger.debug("Replacing counter " + value.get()); - counter.increment(txRecoverCounter, value.get()); + counter.increment(txRecoverCounter, value.get(), sizeValue.get()); } } } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index dbf79e2..5530179 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -171,6 +171,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage private final AtomicInteger queueMemorySize = new AtomicInteger(0); + private final QueuePendingMessageMetrics pendingMetrics = new QueuePendingMessageMetrics(this); + + private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this); + // used to control if we should recalculate certain positions inside deliverAsync private volatile boolean consumersChanged = true; @@ -186,8 +190,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private AtomicLong messagesKilled = new AtomicLong(0); - protected final AtomicInteger deliveringCount = new AtomicInteger(0); - private boolean paused; private long pauseStatusRecord = -1; @@ -452,7 +454,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.server = server; - scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor); + scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this); if (addressSettingsRepository != null) { addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(); @@ -1118,10 +1120,45 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (pageSubscription != null) { // messageReferences will have depaged messages which we need to discount from the counter as they are // counted on the pageSubscription as well - return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount(); + return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount(); + } else { + return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount(); + } + } + + @Override + public long getPersistentSize() { + if (pageSubscription != null) { + // messageReferences will have depaged messages which we need to discount from the counter as they are + // counted on the pageSubscription as well + return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize() + pageSubscription.getPersistentSize(); } else { - return messageReferences.size() + getScheduledCount() + deliveringCount.get(); + return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize(); + } + } + + @Override + public long getDurableMessageCount() { + if (isDurable()) { + if (pageSubscription != null) { + return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount() + pageSubscription.getMessageCount(); + } else { + return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount(); + } + } + return 0; + } + + @Override + public long getDurablePersistentSize() { + if (isDurable()) { + if (pageSubscription != null) { + return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize() + pageSubscription.getPersistentSize(); + } else { + return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize(); + } } + return 0; } @Override @@ -1130,6 +1167,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override + public synchronized long getScheduledSize() { + return scheduledDeliveryHandler.getScheduledSize(); + } + + @Override + public synchronized int getDurableScheduledCount() { + return scheduledDeliveryHandler.getDurableScheduledCount(); + } + + @Override + public synchronized long getDurableScheduledSize() { + return scheduledDeliveryHandler.getDurableScheduledSize(); + } + + @Override public synchronized List<MessageReference> getScheduledMessages() { return scheduledDeliveryHandler.getScheduledReferences(); } @@ -1153,7 +1205,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public int getDeliveringCount() { - return deliveringCount.get(); + return deliveringMetrics.getMessageCount(); + } + + @Override + public long getDeliveringSize() { + return deliveringMetrics.getPersistentSize(); + } + + @Override + public int getDurableDeliveringCount() { + return deliveringMetrics.getDurableMessageCount(); + } + + @Override + public long getDurableDeliveringSize() { + return deliveringMetrics.getDurablePersistentSize(); } @Override @@ -1239,7 +1306,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { getRefsOperation(tx).addAck(ref); // https://issues.jboss.org/browse/HORNETQ-609 - incDelivering(); + incDelivering(ref); messagesAcknowledged.incrementAndGet(); } @@ -1287,7 +1354,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { resetAllIterators(); } else { - decDelivering(); + decDelivering(reference); } } @@ -1354,8 +1421,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public void referenceHandled() { - incDelivering(); + public void referenceHandled(MessageReference ref) { + incDelivering(ref); } @Override @@ -1419,7 +1486,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return iterQueue(flushLimit, filter1, new QueueIterateAction() { @Override public void actMessage(Transaction tx, MessageReference ref) throws Exception { - incDelivering(); + incDelivering(ref); acknowledge(tx, ref, ackReason); refRemoved(ref); } @@ -1539,7 +1606,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { while (iter.hasNext()) { MessageReference ref = iter.next(); if (ref.getMessage().getMessageID() == messageID) { - incDelivering(); + incDelivering(ref); acknowledge(tx, ref); iter.remove(); refRemoved(ref); @@ -1618,7 +1685,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { while (iter.hasNext()) { MessageReference ref = iter.next(); if (ref.getMessage().getMessageID() == messageID) { - incDelivering(); + incDelivering(ref); expire(ref); iter.remove(); refRemoved(ref); @@ -1644,7 +1711,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { while (iter.hasNext()) { MessageReference ref = iter.next(); if (filter == null || filter.match(ref.getMessage())) { - incDelivering(); + incDelivering(ref); expire(tx, ref); iter.remove(); refRemoved(ref); @@ -1711,7 +1778,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (tx == null) { tx = new TransactionImpl(storageManager); } - incDelivering(); + incDelivering(ref); expired = true; expire(tx, ref); iter.remove(); @@ -1763,7 +1830,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { while (iter.hasNext()) { MessageReference ref = iter.next(); if (ref.getMessage().getMessageID() == messageID) { - incDelivering(); + incDelivering(ref); sendToDeadLetterAddress(null, ref); iter.remove(); refRemoved(ref); @@ -1782,7 +1849,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { while (iter.hasNext()) { MessageReference ref = iter.next(); if (filter == null || filter.match(ref.getMessage())) { - incDelivering(); + incDelivering(ref); sendToDeadLetterAddress(null, ref); iter.remove(); refRemoved(ref); @@ -1804,11 +1871,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (ref.getMessage().getMessageID() == messageID) { iter.remove(); refRemoved(ref); - incDelivering(); + incDelivering(ref); try { move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL); } catch (Exception e) { - decDelivering(); + decDelivering(ref); throw e; } return true; @@ -1836,7 +1903,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public void actMessage(Transaction tx, MessageReference ref) throws Exception { boolean ignored = false; - incDelivering(); + incDelivering(ref); if (rejectDuplicates) { byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes(); @@ -1881,7 +1948,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (originalMessageAddress != null) { - incDelivering(); + incDelivering(ref); Long targetQueue = null; if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) { @@ -2065,6 +2132,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private synchronized void internalAddTail(final MessageReference ref) { refAdded(ref); messageReferences.addTail(ref, getPriority(ref)); + pendingMetrics.incrementMetrics(ref); } /** @@ -2076,6 +2144,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { */ private void internalAddHead(final MessageReference ref) { queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); + pendingMetrics.incrementMetrics(ref); refAdded(ref); int priority = getPriority(ref); @@ -2330,6 +2399,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { protected void refRemoved(MessageReference ref) { queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate()); + pendingMetrics.decrementMetrics(ref); if (ref.isPaged()) { pagedReferences.decrementAndGet(); } @@ -2379,6 +2449,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } addTail(reference, false); pageIterator.remove(); + + //We have to increment this here instead of in the iterator so we have access to the reference from next() + pageSubscription.incrementDeliveredSize(getPersistentSize(reference)); } if (logger.isDebugEnabled()) { @@ -2387,7 +2460,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (logger.isDebugEnabled()) { - logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringCount.get()); + logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount()); } } @@ -2466,7 +2539,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - decDelivering(); + decDelivering(reference); return true; } @@ -2890,7 +2963,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public void postAcknowledge(final MessageReference ref) { QueueImpl queue = (QueueImpl) ref.getQueue(); - queue.decDelivering(); + queue.decDelivering(ref); if (ref.isPaged()) { // nothing to be done @@ -2958,7 +3031,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { try { Transaction transaction = new TransactionImpl(storageManager); for (MessageReference reference : refs) { - incDelivering(); // post ack will decrement this, so need to inc + incDelivering(reference); // post ack will decrement this, so need to inc acknowledge(transaction, reference, AckReason.KILLED); } transaction.commit(); @@ -3264,17 +3337,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private int incDelivering() { - return deliveringCount.incrementAndGet(); + private void incDelivering(MessageReference ref) { + deliveringMetrics.incrementMetrics(ref); } - public void decDelivering() { - deliveringCount.decrementAndGet(); + public void decDelivering(final MessageReference reference) { + deliveringMetrics.decrementMetrics(reference); } - @Override - public void decDelivering(int size) { - deliveringCount.addAndGet(-size); + private long getPersistentSize(final MessageReference reference) { + long size = 0; + + try { + size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0; + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e); + } + + return size; } private void configureExpiry(final AddressSettings settings) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java new file mode 100644 index 0000000..f6d65d4 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import com.google.common.base.Preconditions; + +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; + +public class QueuePendingMessageMetrics { + + private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "messageCount"); + + private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> DURABLE_COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durableMessageCount"); + + private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> SIZE_UPDATER = + AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "persistentSize"); + + private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> DURABLE_SIZE_UPDATER = + AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durablePersistentSize"); + + private volatile int messageCount; + + private volatile long persistentSize; + + private volatile int durableMessageCount; + + private volatile long durablePersistentSize; + + private final Queue queue; + + public QueuePendingMessageMetrics(final Queue queue) { + Preconditions.checkNotNull(queue); + this.queue = queue; + } + + public void incrementMetrics(final MessageReference reference) { + long size = getPersistentSize(reference); + COUNT_UPDATER.incrementAndGet(this); + SIZE_UPDATER.addAndGet(this, size); + if (queue.isDurable() && reference.getMessage().isDurable()) { + DURABLE_COUNT_UPDATER.incrementAndGet(this); + DURABLE_SIZE_UPDATER.addAndGet(this, size); + } + } + + public void decrementMetrics(final MessageReference reference) { + long size = -getPersistentSize(reference); + COUNT_UPDATER.decrementAndGet(this); + SIZE_UPDATER.addAndGet(this, size); + if (queue.isDurable() && reference.getMessage().isDurable()) { + DURABLE_COUNT_UPDATER.decrementAndGet(this); + DURABLE_SIZE_UPDATER.addAndGet(this, size); + } + } + + + + /** + * @return the messageCount + */ + public int getMessageCount() { + return messageCount; + } + + /** + * @param messageCount the messageCount to set + */ + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } + + /** + * @return the persistentSize + */ + public long getPersistentSize() { + return persistentSize; + } + + /** + * @param persistentSize the persistentSize to set + */ + public void setPersistentSize(long persistentSize) { + this.persistentSize = persistentSize; + } + + /** + * @return the durableMessageCount + */ + public int getDurableMessageCount() { + return durableMessageCount; + } + + /** + * @param durableMessageCount the durableMessageCount to set + */ + public void setDurableMessageCount(int durableMessageCount) { + this.durableMessageCount = durableMessageCount; + } + + /** + * @return the durablePersistentSize + */ + public long getDurablePersistentSize() { + return durablePersistentSize; + } + + /** + * @param durablePersistentSize the durablePersistentSize to set + */ + public void setDurablePersistentSize(long durablePersistentSize) { + this.durablePersistentSize = durablePersistentSize; + } + + private long getPersistentSize(final MessageReference reference) { + long size = 0; + + try { + size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0; + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e); + } + + return size; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index 6eaba4c..78ec785 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -50,8 +50,12 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { // just adding some information to keep it in order accordingly to the initial operations private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator()); - public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor) { + private final QueuePendingMessageMetrics metrics; + + public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor, + final Queue queue) { this.scheduledExecutor = scheduledExecutor; + this.metrics = new QueuePendingMessageMetrics(queue); } @Override @@ -76,13 +80,27 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { synchronized (scheduledReferences) { scheduledReferences.add(new RefScheduled(ref, tail)); } + metrics.incrementMetrics(ref); } @Override public int getScheduledCount() { - synchronized (scheduledReferences) { - return scheduledReferences.size(); - } + return metrics.getMessageCount(); + } + + @Override + public int getDurableScheduledCount() { + return metrics.getDurableMessageCount(); + } + + @Override + public long getScheduledSize() { + return metrics.getPersistentSize(); + } + + @Override + public long getDurableScheduledSize() { + return metrics.getDurablePersistentSize(); } @Override @@ -109,6 +127,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { if (filter == null || filter.match(ref.getMessage())) { iter.remove(); refs.add(ref); + metrics.decrementMetrics(ref); } } } @@ -123,6 +142,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { MessageReference ref = iter.next().getRef(); if (ref.getMessage().getMessageID() == id) { iter.remove(); + metrics.decrementMetrics(ref); return ref; } } @@ -205,6 +225,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { } iter.remove(); + metrics.decrementMetrics(reference); reference.setScheduledDeliveryTime(0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 4fda8b3..2cae2c7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; @@ -63,7 +64,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Test public void testScheduleRandom() throws Exception { - ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null); + ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0)); long nextMessage = 0; long NUMBER_OF_SEQUENCES = 100000; @@ -88,7 +89,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Test public void testScheduleSameTimeHeadAndTail() throws Exception { - ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null); + ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0)); long time = System.currentTimeMillis() + 10000; for (int i = 10001; i < 20000; i++) { @@ -110,7 +111,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Test public void testScheduleFixedSample() throws Exception { - ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null); + ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0)); addMessage(handler, 0, 48L, true); addMessage(handler, 1, 75L, true); @@ -124,7 +125,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Test public void testScheduleWithAddHeads() throws Exception { - ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null); + ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0)); addMessage(handler, 0, 1, true); addMessage(handler, 1, 2, true); @@ -145,7 +146,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Test public void testScheduleFixedSampleTailAndHead() throws Exception { - ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null); + ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0)); // mix a sequence of tails / heads, but at the end this was supposed to be all sequential addMessage(handler, 1, 48L, true); @@ -191,8 +192,9 @@ public class ScheduledDeliveryHandlerTest extends Assert { private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception { final int NUMBER_OF_MESSAGES = 200; int NUMBER_OF_THREADS = 20; - final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler); + final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS); + final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler, fakeQueue); final long now = System.currentTimeMillis(); @@ -776,6 +778,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { public void sendBuffer(ByteBuf buffer, int count) { } + + @Override + public long getPersistentSize() throws ActiveMQException { + return 0; + } } public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue { @@ -1017,12 +1024,52 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public long getPersistentSize() { + return 0; + } + + @Override + public long getDurableMessageCount() { + return 0; + } + + @Override + public long getDurablePersistentSize() { + return 0; + } + + @Override public int getDeliveringCount() { return 0; } @Override - public void referenceHandled() { + public long getDeliveringSize() { + return 0; + } + + @Override + public int getDurableDeliveringCount() { + return 0; + } + + @Override + public long getDurableDeliveringSize() { + return 0; + } + + @Override + public int getDurableScheduledCount() { + return 0; + } + + @Override + public long getDurableScheduledSize() { + return 0; + } + + @Override + public void referenceHandled(MessageReference ref) { } @@ -1032,6 +1079,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public long getScheduledSize() { + return 0; + } + + @Override public List<MessageReference> getScheduledMessages() { return null; } @@ -1310,7 +1362,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { public SimpleString getUser() { return null; } - @Override public boolean isLastValue() { return false; @@ -1326,13 +1377,5 @@ public class ScheduledDeliveryHandlerTest extends Assert { } - @Override - public void decDelivering(int size) { - - } - - - - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index b256eb9..3a9a785 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -587,7 +587,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public long storePageCounter(long txID, long queueID, long value) throws Exception { + public long storePageCounter(long txID, long queueID, long value, long size) throws Exception { return 0; } @@ -612,12 +612,12 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public long storePageCounterInc(long txID, long queueID, int add) throws Exception { + public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception { return 0; } @Override - public long storePageCounterInc(long queueID, int add) throws Exception { + public long storePageCounterInc(long queueID, int add, long size) throws Exception { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy new file mode 100644 index 0000000..4ef4425 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy @@ -0,0 +1,37 @@ +package metrics +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.core.management.QueueControl; + +//validate metrics are recovered +Object[] queueControls = server.getJMSServerManager().getActiveMQServer().getManagementService().getResources(QueueControl.class); +for (Object o : queueControls) { + QueueControl c = (QueueControl) o; + GroovyRun.assertTrue(c.getPersistentSize() > 0); + GroovyRun.assertTrue(c.getDurablePersistentSize() > 0); + GroovyRun.assertEquals(16l, c.getMessageCount()); + GroovyRun.assertEquals(16l, c.getDurableMessageCount()); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy index fe58505..78d1241 100644 --- a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy +++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy @@ -31,8 +31,10 @@ String id = arg[1]; String type = arg[2]; String producer = arg[3]; String consumer = arg[4]; +String globalMaxSize = arg[5]; println("type = " + type); +println("globalMaxSize = " + globalMaxSize); configuration = new ConfigurationImpl(); configuration.setJournalType(JournalType.NIO); @@ -44,6 +46,10 @@ configuration.setPersistenceEnabled(persistent); try { if (!type.startsWith("ARTEMIS-1")) { configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true)); + if (globalMaxSize != null) { + configuration.getAddressesSettings().get("#").setPageSizeBytes(globalMaxSize); + configuration.setGlobalMaxSize(Long.parseLong(globalMaxSize)); + } } } catch (Throwable e) { // need to ignore this for 1.4 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java index 40da24c..958db27 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java @@ -17,6 +17,9 @@ package org.apache.activemq.artemis.tests.compatibility; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -28,9 +31,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; -import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; - /** * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: * @@ -105,5 +105,42 @@ public class JournalCompatibilityTest extends VersionedBaseTest { evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); } + /** + * Test that the server starts properly using an old journal even though persistent size + * metrics were not originaly stored + */ + @Test + public void testSendReceiveQueueMetrics() throws Throwable { + setVariable(senderClassloader, "persistent", true); + startServer(serverFolder.getRoot(), senderClassloader, "journalTest"); + evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "journalTest"); + + setVariable(receiverClassloader, "latch", null); + evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages"); + } + + /** + * Test that the metrics are recovered when paging. Even though the paging counts won't + * be persisted the journal the server should still start properly. The persistent sizes + * will be recovered when the messages are depaged + */ + @Test + public void testSendReceiveSizeQueueMetricsPaging() throws Throwable { + setVariable(senderClassloader, "persistent", true); + //Set max size to 1 to cause messages to immediately go to the paging store + startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1)); + evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1)); + + + evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages"); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java index e2b9648..9001180 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -189,6 +189,10 @@ public abstract class VersionedBaseTest { } public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable { + startServer(folder, loader, serverName, null); + } + + public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable { folder.mkdirs(); System.out.println("Folder::" + folder); @@ -202,9 +206,8 @@ public abstract class VersionedBaseTest { scriptToUse = "servers/hornetqServer.groovy"; } - evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); + evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize); } - public void stopServer(ClassLoader loader) throws Throwable { execute(loader, "server.stop()"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 078c397..511d476 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -817,5 +817,10 @@ public class AcknowledgeTest extends ActiveMQTestBase { public Map<String, Object> toPropertyMap() { return null; } + + @Override + public long getPersistentSize() throws ActiveMQException { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index d37d134..fe9ba17 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -668,8 +668,8 @@ public class SendAckFailTest extends ActiveMQTestBase { } @Override - public long storePageCounter(long txID, long queueID, long value) throws Exception { - return manager.storePageCounter(txID, queueID, value); + public long storePageCounter(long txID, long queueID, long value, long size) throws Exception { + return manager.storePageCounter(txID, queueID, value, size); } @Override @@ -693,13 +693,13 @@ public class SendAckFailTest extends ActiveMQTestBase { } @Override - public long storePageCounterInc(long txID, long queueID, int add) throws Exception { - return manager.storePageCounterInc(txID, queueID, add); + public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception { + return manager.storePageCounterInc(txID, queueID, add, size); } @Override - public long storePageCounterInc(long queueID, int add) throws Exception { - return manager.storePageCounterInc(queueID, add); + public long storePageCounterInc(long queueID, int add, long size) throws Exception { + return manager.storePageCounterInc(queueID, add, size); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java index 8836ba8..9967e76 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java @@ -122,6 +122,21 @@ public abstract class ManagementTestBase extends ActiveMQTestBase { return control.getMessageCount(); } + protected long getDurableMessageCount(QueueControl control) throws Exception { + control.flushExecutor(); + return control.getDurableMessageCount(); + } + + protected long getMessageSize(QueueControl control) throws Exception { + control.flushExecutor(); + return control.getPersistentSize(); + } + + protected long getDurableMessageSize(QueueControl control) throws Exception { + control.flushExecutor(); + return control.getDurablePersistentSize(); + } + protected long getMessagesAdded(QueueControl control) throws Exception { control.flushExecutor(); return control.getMessagesAdded();