QPID-7763: [Java Broker] Flow to disk if allocated direct memory exceeds broker wide broker.flowToDiskThreshold
(cherry picked from commit 07ea26e7b8fdc24621fcde04949df633186fa5a0) Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2b91b441 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2b91b441 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2b91b441 Branch: refs/heads/6.0.x Commit: 2b91b4419841fb84bb97028d6995f7f125570412 Parents: aad81a6 Author: Alex Rudyy <[email protected]> Authored: Wed May 3 17:13:52 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Wed May 3 17:56:40 2017 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/server/model/Broker.java | 4 ++++ .../server/model/adapter/BrokerAdapter.java | 24 +++++++++++++------- .../apache/qpid/server/queue/AbstractQueue.java | 12 ++++++++-- .../org/apache/qpid/bytebuffer/BufferPool.java | 17 ++++++++++++-- .../qpid/bytebuffer/PooledByteBufferRef.java | 8 +++++++ .../apache/qpid/bytebuffer/QpidByteBuffer.java | 15 ++++++++++++ 6 files changed, 68 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 2c09dc9..059eded 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -280,4 +280,8 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL void assignTargetSizes(); int getNetworkBufferSize(); + + @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.") + long getFlowToDiskThreshold(); + } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 8ded399..17defb8 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -131,6 +131,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple private final BufferPoolMXBean _bufferPoolMXBean; private final List<String> _jvmArguments; private String _documentationUrl; + private long _flowToDiskThreshold; @ManagedObjectFactoryConstructor public BrokerAdapter(Map<String, Object> attributes, @@ -507,8 +508,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @Override public synchronized void assignTargetSizes() { - long totalTarget = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD); - LOGGER.debug("Assigning target sizes based on total target {}", totalTarget); + LOGGER.debug("Assigning target sizes based on total target {}", _flowToDiskThreshold); long totalSize = 0l; Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes(); Map<VirtualHost<?, ?, ?>, Long> vhs = new HashMap<>(); @@ -523,20 +523,20 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } } - if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported) + if (totalSize > _flowToDiskThreshold && !_totalMessageSizeExceedThresholdReported) { - _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024)); + _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024)); _totalMessageSizeExceedThresholdReported = true; _totalMessageSizeWithinThresholdReported = false; } - else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported) + else if (totalSize <= _flowToDiskThreshold && !_totalMessageSizeWithinThresholdReported) { - _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024)); + _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024)); _totalMessageSizeWithinThresholdReported = true; _totalMessageSizeExceedThresholdReported = false; } - final long proportionalShare = (long) ((double) totalTarget / (double) vhs.size()); + final long proportionalShare = (long) ((double) _flowToDiskThreshold / (double) vhs.size()); for (Map.Entry<VirtualHost<?, ?, ?>, Long> entry : vhs.entrySet()) { long virtualHostTotalQueueSize = entry.getValue(); @@ -547,7 +547,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } else { - long queueSizeBasedShare = (totalTarget * virtualHostTotalQueueSize) / (2 * totalSize); + long queueSizeBasedShare = (_flowToDiskThreshold * virtualHostTotalQueueSize) / (2 * totalSize); size = queueSizeBasedShare + (proportionalShare / 2); } @@ -578,6 +578,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple long heapMemory = Runtime.getRuntime().maxMemory(); getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory)); + _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD); + if (SystemUtils.getProcessPid() != null) { getEventLogger().message(BrokerMessages.PROCESS(SystemUtils.getProcessPid())); @@ -785,6 +787,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple _dataReceived.registerEvent(messageSize, timestamp); } + @Override + public long getFlowToDiskThreshold() + { + return _flowToDiskThreshold; + } + public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index d851326..32efff2 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -56,6 +56,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; @@ -90,6 +92,7 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; @@ -297,6 +300,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>(); private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>(); private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap(); + private long _flowToDiskThreshold; private interface HoldMethod { @@ -491,6 +495,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES); _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION); + _flowToDiskThreshold = _virtualHost.getBroker().getFlowToDiskThreshold(); if(_defaultFilters != null) { @@ -3697,7 +3702,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize) { - if ((estimatedQueueSize > targetQueueSize) && storedMessage.isInMemory()) + if ((estimatedQueueSize > targetQueueSize + || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold) + && storedMessage.isInMemory()) { storedMessage.flowToDisk(); } @@ -3713,7 +3720,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize) { - if (estimatedQueueSize > targetQueueSize) + if (estimatedQueueSize > targetQueueSize + || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold) { reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java b/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java index cb0b5ba..b2dbf4e 100644 --- a/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java +++ b/common/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java @@ -22,11 +22,13 @@ package org.apache.qpid.bytebuffer; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; class BufferPool { private final int _maxSize; private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new ConcurrentLinkedQueue<>(); + private final AtomicInteger _size = new AtomicInteger(); BufferPool(final int maxSize) { @@ -35,15 +37,21 @@ class BufferPool ByteBuffer getBuffer() { - return _pooledBuffers.poll(); + final ByteBuffer buffer = _pooledBuffers.poll(); + if (buffer != null) + { + _size.decrementAndGet(); + } + return buffer; } void returnBuffer(ByteBuffer buf) { buf.clear(); - if (_pooledBuffers.size() < _maxSize) + if (size() < _maxSize) { _pooledBuffers.add(buf); + _size.incrementAndGet(); } } @@ -51,4 +59,9 @@ class BufferPool { return _maxSize; } + + public int size() + { + return _size.get(); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java b/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java index 807dfe9..90b5b6f 100644 --- a/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java +++ b/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java @@ -21,11 +21,13 @@ package org.apache.qpid.bytebuffer; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; class PooledByteBufferRef implements ByteBufferRef { private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount"); + private static final AtomicInteger ACTIVE_BUFFERS = new AtomicInteger(); private final ByteBuffer _buffer; private volatile int _refCount; @@ -33,6 +35,7 @@ class PooledByteBufferRef implements ByteBufferRef PooledByteBufferRef(final ByteBuffer buffer) { _buffer = buffer; + ACTIVE_BUFFERS.incrementAndGet(); } @Override @@ -51,6 +54,7 @@ class PooledByteBufferRef implements ByteBufferRef if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0) { QpidByteBuffer.returnToPool(_buffer); + ACTIVE_BUFFERS.decrementAndGet(); } } @@ -66,5 +70,9 @@ class PooledByteBufferRef implements ByteBufferRef REF_COUNT.set(this, Integer.MIN_VALUE/2); } + public static int getActiveBufferCount() + { + return ACTIVE_BUFFERS.get(); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2b91b441/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java index 3c3f099..9d09f9e 100644 --- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java +++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java @@ -711,6 +711,21 @@ public final class QpidByteBuffer return _pooledBufferSize; } + public static int getAllocatedDirectMemorySize() + { + return _pooledBufferSize * getNumberOfActivePooledBuffers(); + } + + public static int getNumberOfActivePooledBuffers() + { + return PooledByteBufferRef.getActiveBufferCount(); + } + + public static int getNumberOfPooledBuffers() + { + return _bufferPool.size(); + } + private final class BufferInputStream extends InputStream { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
