Stirling Chow created AMQ-4121:
----------------------------------
Summary: Expose Destination.setMemoryUsage so that custom policies
can override default MemoryUsage
Key: AMQ-4121
URL: https://issues.apache.org/jira/browse/AMQ-4121
Project: ActiveMQ
Issue Type: New Feature
Affects Versions: 5.7.0
Reporter: Stirling Chow
Using queues as an example, but this also applies to topics...
When a queue is created, it inherits the System-wide (broker) usage manager and
its limits. A policy can be applied to the queue in order to specify a
destination-specific memory usage limit:
{code}
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
...
{code}
This policy entry is still dependent on the existing
org.apache.activemq.usage.MemoryUsage class:
{code:title=org.apache.activemq.broker.region.policy.PolicyEntry}
public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
queue.setDeadLetterStrategy(getDeadLetterStrategy());
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if (memoryLimit > 0) {
queue.getMemoryUsage().setLimit(memoryLimit);
}
...
{code}
We wanted to create a usage policy that would limit the number of messages in a
queue by count (i.e., max 1000 messages) rather than by memory usage.
The existing usage strategies are memory-centric. It would be nice if a
destination had a generic Usage property rather than a specific MemoryUsage
property, but barring refactoring the interfaces, it would be possible to
achieve our goal if we could *set* the MemoryUsage property of a queue; this is
as simple as adding a Destination.setMemoryUsage(MemoryUsage) property go with
the existing Destination.getMemoryUsage(MemoryUsage) property.
The attached patch contains the trivial changes to expose
Destination.setMemoryUsage. The patch also includes an example of an extended
policy that allows a discrete (i.e., count based) usage policy on queues.
Rather than enforcing a specific limit to the number of messages in a queue,
this policy determines the limit as a multiple of the number of consumers. It
would be even easier to do the former, but our use case required the latter.
Also note that this consumer-ratio policy is orthogonal to the consumer
prefetch limit --- it's a policy that we needed to prevent a distributed queue
with many messages from being completely drained to a remote broker during
startup (i.e., before additional brokers had started).
{code:Example of ExtendedPolicy configuration}
<bean class="com.invoqsystems.foundation.activemq.ExtendedPolicyEntry">
<property name="queue" value="shared.notification.*" />
<property name="messageToConsumerRatioLimit" value="99" />
</bean>
{code}
{code:com.invoqsystems.foundation.activemq.ExtendedPolicyEntry}
public class ExtendedPolicyEntry extends PolicyEntry {
private long messageToConsumerRatioLimit;
/**
* This is called by AMQ when a queue is first created. If a
message-to-consumer ratio is specified, a
* {@link DiscreteMemoryUsage} class with a {@link
ConsumerRatioUsageCapacity} limiter replaces the queue's
* byte-based MemoryUsage class. The original parent of the queue's
byte-based MemoryUsage becomes the parent of the
* {@link DiscreteMemoryUsage}, so it will also receive updates and can
signal when the queue is full (e.g., because
* of queue or system memory limits).
*/
@Override
public void configure(Broker broker, Queue queue) {
super.configure(broker, queue);
if (messageToConsumerRatioLimit > 0) {
DiscreteMemoryUsage ratioUsage = new
DiscreteMemoryUsage(queue.getMemoryUsage().getParent(), ":ratio");
ratioUsage.setLimiter(new ConsumerRatioUsageCapacity(queue));
ratioUsage.setLimit(messageToConsumerRatioLimit);
ratioUsage.setParent(queue.getMemoryUsage());
ratioUsage.setExecutor(queue.getMemoryUsage().getExecutor());
queue.setMemoryUsage(ratioUsage);
}
}
...
{code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira