This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 94d5373 QPID-8559: Add debug logging for flow to disk conditions 94d5373 is described below commit 94d5373a9d402827fd761e2b190fa3df435f95ed Author: Alex Rudyy <oru...@apache.org> AuthorDate: Sun Aug 22 20:18:30 2021 +0100 QPID-8559: Add debug logging for flow to disk conditions --- .../org/apache/qpid/server/model/BrokerImpl.java | 36 +++++++++++++ .../server/virtualhost/AbstractVirtualHost.java | 59 ++++++++++++++++++++++ .../runtime/Java-Broker-Runtime-Flow-To-Disk.xml | 50 ++++++++++++++++++ 3 files changed, 145 insertions(+) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java index d2bd9d4..2d61b11 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java @@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -96,6 +97,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator; public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<BrokerImpl> { private static final Logger LOGGER = LoggerFactory.getLogger(BrokerImpl.class); + private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.broker"); private static final Pattern MODEL_VERSION_PATTERN = Pattern.compile("^\\d+\\.\\d+$"); @@ -146,6 +148,8 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker< private volatile ScheduledFuture<?> _statisticsReportingFuture; private long _housekeepingCheckPeriod; + private final AtomicBoolean _directMemoryExceedsThresholdReported = new AtomicBoolean(); + @ManagedObjectFactoryConstructor public BrokerImpl(Map<String, Object> attributes, SystemConfig parent) @@ -571,6 +575,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker< @Override public synchronized void assignTargetSizes() { + reportDirectMemoryAboveThresholdIfExceeded(); LOGGER.debug("Assigning target sizes based on total target {}", _flowToDiskThreshold); long totalSize = 0l; Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes(); @@ -609,6 +614,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker< } entry.getKey().setTargetSize(size); } + reportDirectMemoryBelowThresholdIfReached(); } @Override @@ -1352,4 +1358,34 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker< } return Collections.<Principal>unmodifiableSet(currentSubject.getPrincipals(GroupPrincipal.class)); } + + private void reportDirectMemoryBelowThresholdIfReached() + { + if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()) + { + final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize(); + if (allocatedDirectMemorySize >= _flowToDiskThreshold + && _directMemoryExceedsThresholdReported.compareAndSet(true, false)) + { + DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) maintained : {}", + _flowToDiskThreshold, + allocatedDirectMemorySize); + } + } + } + + private void reportDirectMemoryAboveThresholdIfExceeded() + { + if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()) + { + final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize(); + if (allocatedDirectMemorySize > _flowToDiskThreshold + && _directMemoryExceedsThresholdReported.compareAndSet(false, true)) + { + DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) exceeded : {}", + _flowToDiskThreshold, + allocatedDirectMemorySize); + } + } + } } diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index ec0d776..0b1d1de 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -77,6 +77,7 @@ import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -169,6 +170,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private static final Logger LOGGER = LoggerFactory.getLogger(AbstractVirtualHost.class); + private static final Logger DIRECT_MEMORY_USAGE_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.directMemory.virtualhost"); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; @@ -219,6 +221,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private volatile boolean _createDefaultExchanges; + private final AtomicBoolean _directMemoryExceedsTargetReported = new AtomicBoolean(); + private final AccessControl _systemUserAllowed = new SubjectFixedResultAccessControl(new ResultCalculator() { @Override @@ -1712,6 +1716,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _messagesOut.incrementAndGet(); _bytesOut.addAndGet(messageSize); _broker.registerMessageDelivered(messageSize); + reportDirectMemoryBelowTargetIfReached(); } @Override @@ -1725,6 +1730,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { _maximumMessageSize.compareAndSet(hwm, messageSize); } + reportDirectMemoryAboveTargetIfExceeded(); } @Override @@ -2136,6 +2142,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if (isOverTargetSize()) { long currentTargetSize = _targetSize.get(); + reportDirectMemoryAboveTargetIfExceeded(currentTargetSize, + AbstractVirtualHost.this.getInMemoryMessageSize()); List<QueueEntryIterator> queueIterators = new ArrayList<>(); for (Queue<?> q : getChildren(Queue.class)) { @@ -2181,6 +2189,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte cyclicIterators.remove(); } } + reportDirectMemoryBelowTargetIfReached(cumulativeSize, + AbstractVirtualHost.this.getInMemoryMessageSize()); } } } @@ -2590,6 +2600,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public void setTargetSize(final long targetSize) { _targetSize.set(targetSize); + final long inMemoryMessageSize = getInMemoryMessageSize(); + reportDirectMemoryAboveTargetIfExceeded(targetSize, inMemoryMessageSize); + reportDirectMemoryBelowTargetIfReached(targetSize, inMemoryMessageSize); } @Override @@ -3423,4 +3436,50 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte String.valueOf(outcome), attributesAsString(attributes))); } + + private void reportDirectMemoryAboveTargetIfExceeded() + { + if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()) + { + reportDirectMemoryAboveTargetIfExceeded(getTargetSize(), getInMemoryMessageSize()); + } + } + + private void reportDirectMemoryBelowTargetIfReached() + { + if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()) + { + reportDirectMemoryBelowTargetIfReached(getTargetSize(), getInMemoryMessageSize()); + } + } + + private void reportDirectMemoryBelowTargetIfReached(final long currentTargetSize, final long inMemoryMessageSize) + { + if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled() + && inMemoryMessageSize <= currentTargetSize + && QpidByteBuffer.getAllocatedDirectMemorySize() <= _broker.getFlowToDiskThreshold() + && _directMemoryExceedsTargetReported.compareAndSet(true, false)) + { + DIRECT_MEMORY_USAGE_LOGGER.debug( + "VirtualHost '{}' direct memory allocation threshold ({}) maintained : {} bytes. Flow to disk stopped.", + getName(), + currentTargetSize, + inMemoryMessageSize); + } + } + + private void reportDirectMemoryAboveTargetIfExceeded(final long currentTargetSize, final long inMemoryMessageSize) + { + if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled() + && (inMemoryMessageSize > currentTargetSize + || QpidByteBuffer.getAllocatedDirectMemorySize() > _broker.getFlowToDiskThreshold()) + && _directMemoryExceedsTargetReported.compareAndSet(false, true)) + { + DIRECT_MEMORY_USAGE_LOGGER.debug( + "VirtualHost '{}' direct memory allocation threshold ({}) exceeded : {} bytes. Flow to disk enforced.", + getName(), + currentTargetSize, + inMemoryMessageSize); + } + } } diff --git a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml index 01dbf1a..09bdb77 100644 --- a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml +++ b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml @@ -39,4 +39,54 @@ <para>Flow to disk is configured by Broker context variable <literal>broker.flowToDiskThreshold</literal>. It is expressed as a size in bytes and defaults to 75% of the JVM maximum heap size.</para> + <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Monitoring"> + <title>Flow to Disk Monitoring</title> + <para>A number of statistics attributes are available on the <literal>Broker</literal> to allow monitoring + of the amount of utilized direct memory by the enqueued messages. + </para> + <para>The total amount of allocated direct memory by the <literal>Broker</literal> can be determined by + checking <literal>Broker</literal> statistics <literal>usedDirectMemorySize</literal>. There is another + <literal>Broker</literal> level statistics <literal>directMemoryTotalCapacity</literal> to get the total amount + of allocated direct memory. Usually, the values reported by both statistics attributes + <literal>usedDirectMemorySize</literal> and <literal>directMemoryTotalCapacity</literal> are the same + or do not differ much. + </para> + <para>The direct memory consumed by the <literal>VirtualHost</literal> messages is reported as + <literal>VirtualHost</literal> statistics <literal>inMemoryMessageSize</literal>. The current value of + <literal>VirtualHost</literal> direct memory threshold is exposed with statistics attribute + <literal>inMemoryMessageThreshold</literal>. When the value of <literal>inMemoryMessageSize</literal> is + greater than <literal>inMemoryMessageThreshold</literal>, the flow to disk is triggered to bring the amount of + direct memory consumed by the <literal>VirtualHost</literal> messages in-line with the + <literal>inMemoryMessageThreshold</literal>. + </para> + </section> + <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Logging"> + <title>Flow to Disk Logging</title> + <para>The <literal>Flow to Disk</literal> events are not reported as operational logs or + <literal>INFO</literal> logs due to quite frequent triggering of <literal>Flow to Disk</literal> + for messaging use cases requiring holding messages on the <literal>Broker</literal> side for some time. + As result, the <literal>Flow to Disk</literal> logs can quickly dominate the broker logs and cause unnecessary + disk consumption. + </para> + <para>Though, if required, the <literal>Flow to Disk</literal> DEBUG logs can be enabled by adding + the following logging rule into the corresponding <literal>Broker</literal> logger. + <example> + <title>Flow to Disk logging rule</title> + <programlisting> + { + "name" : "DirectMemory", + "type" : "NameAndLevel", + "level" : "DEBUG", + "loggerName" : "org.apache.qpid.server.directMemory.*" + } + </programlisting> + </example> + </para> + <para>Please note, that the logger <literal>org.apache.qpid.server.directMemory.broker</literal> + is used by the <literal>Broker</literal> to report conditions when direct memory utilization exceeds the pred-defined + <literal>Broker</literal> threshold, whilst the logger <literal>org.apache.qpid.server.directMemory.virtualhost</literal> + is used to report conditions when direct memory utilization by the <literal>VirtualHost</literal> + messages exceeds the current value of the <literal>VirtualHost</literal> threshold. + </para> + </section> </section> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org