QPID-7763: [Java Broker] Flow to disk if allocated direct memory exceeds broker 
wide broker.flowToDiskThreshold

(cherry picked from commit 07ea26e7b8fdc24621fcde04949df633186fa5a0)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2b91b441
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2b91b441
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2b91b441

Branch: refs/heads/6.0.x
Commit: 2b91b4419841fb84bb97028d6995f7f125570412
Parents: aad81a6
Author: Alex Rudyy <[email protected]>
Authored: Wed May 3 17:13:52 2017 +0100
Committer: Alex Rudyy <[email protected]>
Committed: Wed May 3 17:56:40 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Broker.java    |  4 ++++
 .../server/model/adapter/BrokerAdapter.java     | 24 +++++++++++++-------
 .../apache/qpid/server/queue/AbstractQueue.java | 12 ++++++++--
 .../org/apache/qpid/bytebuffer/BufferPool.java  | 17 ++++++++++++--
 .../qpid/bytebuffer/PooledByteBufferRef.java    |  8 +++++++
 .../apache/qpid/bytebuffer/QpidByteBuffer.java  | 15 ++++++++++++
 6 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 2c09dc9..059eded 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -280,4 +280,8 @@ public interface Broker<X extends Broker<X>> extends 
ConfiguredObject<X>, EventL
     void assignTargetSizes();
 
     int getNetworkBufferSize();
+
+    @DerivedAttribute(description = "Threshold direct memory size (in bytes) 
at which the Broker will start flowing incoming messages to disk.")
+    long getFlowToDiskThreshold();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
 
b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 8ded399..17defb8 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -131,6 +131,7 @@ public class BrokerAdapter extends 
AbstractConfiguredObject<BrokerAdapter> imple
     private final BufferPoolMXBean _bufferPoolMXBean;
     private final List<String> _jvmArguments;
     private String _documentationUrl;
+    private long _flowToDiskThreshold;
 
     @ManagedObjectFactoryConstructor
     public BrokerAdapter(Map<String, Object> attributes,
@@ -507,8 +508,7 @@ public class BrokerAdapter extends 
AbstractConfiguredObject<BrokerAdapter> imple
     @Override
     public synchronized void assignTargetSizes()
     {
-        long totalTarget = getContextValue(Long.class, 
BROKER_FLOW_TO_DISK_THRESHOLD);
-        LOGGER.debug("Assigning target sizes based on total target {}", 
totalTarget);
+        LOGGER.debug("Assigning target sizes based on total target {}", 
_flowToDiskThreshold);
         long totalSize = 0l;
         Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
         Map<VirtualHost<?, ?, ?>, Long> vhs = new HashMap<>();
@@ -523,20 +523,20 @@ public class BrokerAdapter extends 
AbstractConfiguredObject<BrokerAdapter> imple
             }
         }
 
-        if (totalSize > totalTarget && 
!_totalMessageSizeExceedThresholdReported)
+        if (totalSize > _flowToDiskThreshold && 
!_totalMessageSizeExceedThresholdReported)
         {
-            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize 
/ 1024, totalTarget / 1024));
+            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize 
/ 1024, _flowToDiskThreshold / 1024));
             _totalMessageSizeExceedThresholdReported = true;
             _totalMessageSizeWithinThresholdReported = false;
         }
-        else if (totalSize <= totalTarget && 
!_totalMessageSizeWithinThresholdReported)
+        else if (totalSize <= _flowToDiskThreshold && 
!_totalMessageSizeWithinThresholdReported)
         {
-            
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, 
totalTarget / 1024));
+            
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, 
_flowToDiskThreshold / 1024));
             _totalMessageSizeWithinThresholdReported = true;
             _totalMessageSizeExceedThresholdReported = false;
         }
 
-        final long proportionalShare = (long) ((double) totalTarget / (double) 
vhs.size());
+        final long proportionalShare = (long) ((double) _flowToDiskThreshold / 
(double) vhs.size());
         for (Map.Entry<VirtualHost<?, ?, ?>, Long> entry : vhs.entrySet())
         {
             long virtualHostTotalQueueSize = entry.getValue();
@@ -547,7 +547,7 @@ public class BrokerAdapter extends 
AbstractConfiguredObject<BrokerAdapter> imple
             }
             else
             {
-                long queueSizeBasedShare = (totalTarget * 
virtualHostTotalQueueSize) / (2 * totalSize);
+                long queueSizeBasedShare = (_flowToDiskThreshold * 
virtualHostTotalQueueSize) / (2 * totalSize);
                 size = queueSizeBasedShare + (proportionalShare / 2);
             }
 
@@ -578,6 +578,8 @@ public class BrokerAdapter extends 
AbstractConfiguredObject<BrokerAdapter> imple
         long heapMemory = Runtime.getRuntime().maxMemory();
         getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, 
directMemory));
 
+        _flowToDiskThreshold = getContextValue(Long.class, 
BROKER_FLOW_TO_DISK_THRESHOLD);
+
         if (SystemUtils.getProcessPid() != null)
         {
             
getEventLogger().message(BrokerMessages.PROCESS(SystemUtils.getProcessPid()));
@@ -785,6 +787,12 @@ public class BrokerAdapter extends 
AbstractConfiguredObject<BrokerAdapter> imple
         _dataReceived.registerEvent(messageSize, timestamp);
     }
 
+    @Override
+    public long getFlowToDiskThreshold()
+    {
+        return _flowToDiskThreshold;
+    }
+
     public StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index d851326..32efff2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -56,6 +56,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
@@ -90,6 +92,7 @@ import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Exchange;
@@ -297,6 +300,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     private final ConcurrentMap<String, Callable<MessageFilter>> 
_defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
     private Map<String, String> _mimeTypeToFileExtension = 
Collections.emptyMap();
+    private long _flowToDiskThreshold;
 
     private interface HoldMethod
     {
@@ -491,6 +495,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
 
         _maxAsyncDeliveries = getContextValue(Integer.class, 
Queue.MAX_ASYNCHRONOUS_DELIVERIES);
         _mimeTypeToFileExtension = getContextValue(Map.class, 
MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
+        _flowToDiskThreshold = 
_virtualHost.getBroker().getFlowToDiskThreshold();
 
         if(_defaultFilters != null)
         {
@@ -3697,7 +3702,9 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
 
         void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long 
estimatedQueueSize, final long targetQueueSize)
         {
-            if ((estimatedQueueSize > targetQueueSize) && 
storedMessage.isInMemory())
+            if ((estimatedQueueSize > targetQueueSize
+                 || QpidByteBuffer.getAllocatedDirectMemorySize() > 
_flowToDiskThreshold)
+                && storedMessage.isInMemory())
             {
                 storedMessage.flowToDisk();
             }
@@ -3713,7 +3720,8 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
 
         void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, 
final long targetQueueSize)
         {
-            if (estimatedQueueSize > targetQueueSize)
+            if (estimatedQueueSize > targetQueueSize
+                || QpidByteBuffer.getAllocatedDirectMemorySize() > 
_flowToDiskThreshold)
             {
                 reportFlowToDiskActiveIfNecessary(estimatedQueueSize, 
targetQueueSize);
             }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java 
b/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
index cb0b5ba..b2dbf4e 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
@@ -22,11 +22,13 @@ package org.apache.qpid.bytebuffer;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 class BufferPool
 {
     private final int _maxSize;
     private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new 
ConcurrentLinkedQueue<>();
+    private final AtomicInteger _size = new AtomicInteger();
 
     BufferPool(final int maxSize)
     {
@@ -35,15 +37,21 @@ class BufferPool
 
     ByteBuffer getBuffer()
     {
-        return _pooledBuffers.poll();
+        final ByteBuffer buffer = _pooledBuffers.poll();
+        if (buffer != null)
+        {
+            _size.decrementAndGet();
+        }
+        return buffer;
     }
 
     void returnBuffer(ByteBuffer buf)
     {
         buf.clear();
-        if (_pooledBuffers.size() < _maxSize)
+        if (size() < _maxSize)
         {
             _pooledBuffers.add(buf);
+            _size.incrementAndGet();
         }
     }
 
@@ -51,4 +59,9 @@ class BufferPool
     {
         return _maxSize;
     }
+
+    public int size()
+    {
+        return _size.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java 
b/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
index 807dfe9..90b5b6f 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
@@ -21,11 +21,13 @@
 package org.apache.qpid.bytebuffer;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 class PooledByteBufferRef implements ByteBufferRef
 {
     private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> 
REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, 
"_refCount");
+    private static final AtomicInteger ACTIVE_BUFFERS = new AtomicInteger();
 
     private final ByteBuffer _buffer;
     private volatile int _refCount;
@@ -33,6 +35,7 @@ class PooledByteBufferRef implements ByteBufferRef
     PooledByteBufferRef(final ByteBuffer buffer)
     {
         _buffer = buffer;
+        ACTIVE_BUFFERS.incrementAndGet();
     }
 
     @Override
@@ -51,6 +54,7 @@ class PooledByteBufferRef implements ByteBufferRef
         if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
         {
             QpidByteBuffer.returnToPool(_buffer);
+            ACTIVE_BUFFERS.decrementAndGet();
         }
     }
 
@@ -66,5 +70,9 @@ class PooledByteBufferRef implements ByteBufferRef
         REF_COUNT.set(this, Integer.MIN_VALUE/2);
     }
 
+    public static int getActiveBufferCount()
+    {
+       return ACTIVE_BUFFERS.get();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java 
b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 3c3f099..9d09f9e 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -711,6 +711,21 @@ public final class QpidByteBuffer
         return _pooledBufferSize;
     }
 
+    public static int getAllocatedDirectMemorySize()
+    {
+        return _pooledBufferSize * getNumberOfActivePooledBuffers();
+    }
+
+    public static int getNumberOfActivePooledBuffers()
+    {
+        return PooledByteBufferRef.getActiveBufferCount();
+    }
+
+    public static int getNumberOfPooledBuffers()
+    {
+        return _bufferPool.size();
+    }
+
     private final class BufferInputStream extends InputStream
     {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to