This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 94d5373  QPID-8559: Add debug logging for flow to disk conditions
94d5373 is described below

commit 94d5373a9d402827fd761e2b190fa3df435f95ed
Author: Alex Rudyy <oru...@apache.org>
AuthorDate: Sun Aug 22 20:18:30 2021 +0100

    QPID-8559: Add debug logging for flow to disk conditions
---
 .../org/apache/qpid/server/model/BrokerImpl.java   | 36 +++++++++++++
 .../server/virtualhost/AbstractVirtualHost.java    | 59 ++++++++++++++++++++++
 .../runtime/Java-Broker-Runtime-Flow-To-Disk.xml   | 50 ++++++++++++++++++
 3 files changed, 145 insertions(+)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index d2bd9d4..2d61b11 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
@@ -96,6 +97,7 @@ import 
org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
 public class BrokerImpl extends AbstractContainer<BrokerImpl> implements 
Broker<BrokerImpl>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerImpl.class);
+    private static final Logger DIRECT_MEMORY_USAGE_LOGGER = 
LoggerFactory.getLogger("org.apache.qpid.server.directMemory.broker");
 
     private static final Pattern MODEL_VERSION_PATTERN = 
Pattern.compile("^\\d+\\.\\d+$");
 
@@ -146,6 +148,8 @@ public class BrokerImpl extends 
AbstractContainer<BrokerImpl> implements Broker<
     private volatile ScheduledFuture<?> _statisticsReportingFuture;
     private long _housekeepingCheckPeriod;
 
+    private final AtomicBoolean _directMemoryExceedsThresholdReported = new 
AtomicBoolean();
+
     @ManagedObjectFactoryConstructor
     public BrokerImpl(Map<String, Object> attributes,
                       SystemConfig parent)
@@ -571,6 +575,7 @@ public class BrokerImpl extends 
AbstractContainer<BrokerImpl> implements Broker<
     @Override
     public synchronized void assignTargetSizes()
     {
+        reportDirectMemoryAboveThresholdIfExceeded();
         LOGGER.debug("Assigning target sizes based on total target {}", 
_flowToDiskThreshold);
         long totalSize = 0l;
         Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
@@ -609,6 +614,7 @@ public class BrokerImpl extends 
AbstractContainer<BrokerImpl> implements Broker<
             }
             entry.getKey().setTargetSize(size);
         }
+        reportDirectMemoryBelowThresholdIfReached();
     }
 
     @Override
@@ -1352,4 +1358,34 @@ public class BrokerImpl extends 
AbstractContainer<BrokerImpl> implements Broker<
         }
         return 
Collections.<Principal>unmodifiableSet(currentSubject.getPrincipals(GroupPrincipal.class));
     }
+
+    private void reportDirectMemoryBelowThresholdIfReached()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            final long allocatedDirectMemorySize = 
QpidByteBuffer.getAllocatedDirectMemorySize();
+            if (allocatedDirectMemorySize >= _flowToDiskThreshold
+                && _directMemoryExceedsThresholdReported.compareAndSet(true, 
false))
+            {
+                DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) 
maintained : {}",
+                                                 _flowToDiskThreshold,
+                                                 allocatedDirectMemorySize);
+            }
+        }
+    }
+
+    private void reportDirectMemoryAboveThresholdIfExceeded()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            final long allocatedDirectMemorySize = 
QpidByteBuffer.getAllocatedDirectMemorySize();
+            if (allocatedDirectMemorySize > _flowToDiskThreshold
+                && _directMemoryExceedsThresholdReported.compareAndSet(false, 
true))
+            {
+                DIRECT_MEMORY_USAGE_LOGGER.debug("Direct memory threshold ({}) 
exceeded : {}",
+                                                 _flowToDiskThreshold,
+                                                 allocatedDirectMemorySize);
+            }
+        }
+    }
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index ec0d776..0b1d1de 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -77,6 +77,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -169,6 +170,7 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
 
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractVirtualHost.class);
+    private static final Logger DIRECT_MEMORY_USAGE_LOGGER = 
LoggerFactory.getLogger("org.apache.qpid.server.directMemory.virtualhost");
 
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
 
@@ -219,6 +221,8 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
 
     private volatile boolean _createDefaultExchanges;
 
+    private final AtomicBoolean _directMemoryExceedsTargetReported = new 
AtomicBoolean();
+
     private final AccessControl _systemUserAllowed = new 
SubjectFixedResultAccessControl(new ResultCalculator()
     {
         @Override
@@ -1712,6 +1716,7 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
         _messagesOut.incrementAndGet();
         _bytesOut.addAndGet(messageSize);
         _broker.registerMessageDelivered(messageSize);
+        reportDirectMemoryBelowTargetIfReached();
     }
 
     @Override
@@ -1725,6 +1730,7 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
         {
             _maximumMessageSize.compareAndSet(hwm, messageSize);
         }
+        reportDirectMemoryAboveTargetIfExceeded();
     }
 
     @Override
@@ -2136,6 +2142,8 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
             if (isOverTargetSize())
             {
                 long currentTargetSize = _targetSize.get();
+                reportDirectMemoryAboveTargetIfExceeded(currentTargetSize,
+                                                        
AbstractVirtualHost.this.getInMemoryMessageSize());
                 List<QueueEntryIterator> queueIterators = new ArrayList<>();
                 for (Queue<?> q : getChildren(Queue.class))
                 {
@@ -2181,6 +2189,8 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
                         cyclicIterators.remove();
                     }
                 }
+                reportDirectMemoryBelowTargetIfReached(cumulativeSize,
+                                                       
AbstractVirtualHost.this.getInMemoryMessageSize());
             }
         }
     }
@@ -2590,6 +2600,9 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
     public void setTargetSize(final long targetSize)
     {
         _targetSize.set(targetSize);
+        final long inMemoryMessageSize = getInMemoryMessageSize();
+        reportDirectMemoryAboveTargetIfExceeded(targetSize, 
inMemoryMessageSize);
+        reportDirectMemoryBelowTargetIfReached(targetSize, 
inMemoryMessageSize);
     }
 
     @Override
@@ -3423,4 +3436,50 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
                                                         
String.valueOf(outcome),
                                                         
attributesAsString(attributes)));
     }
+
+    private void reportDirectMemoryAboveTargetIfExceeded()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            reportDirectMemoryAboveTargetIfExceeded(getTargetSize(), 
getInMemoryMessageSize());
+        }
+    }
+
+    private void reportDirectMemoryBelowTargetIfReached()
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled())
+        {
+            reportDirectMemoryBelowTargetIfReached(getTargetSize(), 
getInMemoryMessageSize());
+        }
+    }
+
+    private void reportDirectMemoryBelowTargetIfReached(final long 
currentTargetSize, final long inMemoryMessageSize)
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
+            && inMemoryMessageSize <= currentTargetSize
+            && QpidByteBuffer.getAllocatedDirectMemorySize() <= 
_broker.getFlowToDiskThreshold()
+            && _directMemoryExceedsTargetReported.compareAndSet(true, false))
+        {
+            DIRECT_MEMORY_USAGE_LOGGER.debug(
+                    "VirtualHost '{}' direct memory allocation threshold ({}) 
maintained : {} bytes. Flow to disk stopped.",
+                    getName(),
+                    currentTargetSize,
+                    inMemoryMessageSize);
+        }
+    }
+
+    private void reportDirectMemoryAboveTargetIfExceeded(final long 
currentTargetSize, final long inMemoryMessageSize)
+    {
+        if (DIRECT_MEMORY_USAGE_LOGGER.isDebugEnabled()
+            && (inMemoryMessageSize > currentTargetSize
+                || QpidByteBuffer.getAllocatedDirectMemorySize() > 
_broker.getFlowToDiskThreshold())
+            && _directMemoryExceedsTargetReported.compareAndSet(false, true))
+        {
+            DIRECT_MEMORY_USAGE_LOGGER.debug(
+                    "VirtualHost '{}' direct memory allocation threshold ({}) 
exceeded : {} bytes. Flow to disk enforced.",
+                    getName(),
+                    currentTargetSize,
+                    inMemoryMessageSize);
+        }
+    }
 }
diff --git 
a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml 
b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
index 01dbf1a..09bdb77 100644
--- a/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
+++ b/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
@@ -39,4 +39,54 @@
   <para>Flow to disk is configured by Broker context variable
       <literal>broker.flowToDiskThreshold</literal>. It is expressed as a size 
in bytes and defaults
     to 75% of the JVM maximum heap size.</para>
+  <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Monitoring">
+    <title>Flow to Disk Monitoring</title>
+    <para>A number of statistics attributes are available on the 
<literal>Broker</literal> to allow monitoring
+      of the amount of utilized direct memory by the enqueued messages.
+    </para>
+    <para>The total amount of allocated direct memory by the 
<literal>Broker</literal> can be determined by
+      checking <literal>Broker</literal> statistics 
<literal>usedDirectMemorySize</literal>. There is another
+      <literal>Broker</literal> level statistics 
<literal>directMemoryTotalCapacity</literal> to get the total amount
+      of allocated direct memory. Usually, the values reported by both 
statistics attributes
+      <literal>usedDirectMemorySize</literal> and 
<literal>directMemoryTotalCapacity</literal> are the same
+      or do not differ much.
+    </para>
+    <para>The direct memory consumed by the <literal>VirtualHost</literal> 
messages is reported as
+      <literal>VirtualHost</literal> statistics 
<literal>inMemoryMessageSize</literal>. The current value of
+      <literal>VirtualHost</literal> direct memory threshold is exposed with 
statistics attribute
+      <literal>inMemoryMessageThreshold</literal>. When the value of 
<literal>inMemoryMessageSize</literal> is
+      greater than <literal>inMemoryMessageThreshold</literal>, the flow to 
disk is triggered to bring the amount of
+      direct memory consumed by the <literal>VirtualHost</literal> messages 
in-line with the
+      <literal>inMemoryMessageThreshold</literal>.
+    </para>
+  </section>
+  <section role="h2" xml:id="Java-Broker-Runtime-Flow-To-Disk-Logging">
+    <title>Flow to Disk Logging</title>
+    <para>The <literal>Flow to Disk</literal> events are not reported as 
operational logs or
+      <literal>INFO</literal> logs due to quite frequent triggering of 
<literal>Flow to Disk</literal>
+      for messaging use cases requiring holding messages on the 
<literal>Broker</literal> side for some time.
+      As result, the <literal>Flow to Disk</literal> logs can quickly dominate 
the broker logs and cause unnecessary
+      disk consumption.
+    </para>
+    <para>Though, if required, the <literal>Flow to Disk</literal> DEBUG logs 
can be enabled by adding
+      the following logging rule into the corresponding 
<literal>Broker</literal> logger.
+      <example>
+        <title>Flow to Disk logging rule</title>
+        <programlisting>
+          {
+            "name" : "DirectMemory",
+            "type" : "NameAndLevel",
+            "level" : "DEBUG",
+            "loggerName" : "org.apache.qpid.server.directMemory.*"
+          }
+        </programlisting>
+      </example>
+    </para>
+    <para>Please note, that the logger 
<literal>org.apache.qpid.server.directMemory.broker</literal>
+      is used by the <literal>Broker</literal> to report conditions when 
direct memory utilization exceeds the pred-defined
+      <literal>Broker</literal> threshold, whilst the logger 
<literal>org.apache.qpid.server.directMemory.virtualhost</literal>
+      is used to report conditions when direct memory utilization by the 
<literal>VirtualHost</literal>
+      messages exceeds the current value of the <literal>VirtualHost</literal> 
threshold.
+    </para>
+  </section>
 </section>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to