Author: dejanb
Date: Tue Sep 22 12:07:44 2009
New Revision: 817625
URL: http://svn.apache.org/viewvc?rev=817625&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2400 - producer flow control log
messages for queues
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=817625&r1=817624&r2=817625&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Sep 22 12:07:44 2009
@@ -412,6 +412,9 @@
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
+ final String logMessage = "Usage Manager memory limit reached.
Stopping producer (" + message.getProducerId() + ") to prevent flooding " +
getActiveMQDestination().getQualifiedName() + "." +
+ " See
http://activemq.apache.org/producer-flow-control.html for more info";
+ LOG.info(logMessage);
if (systemUsage.isSendFailIfNoSpace()) {
throw new
javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
}
@@ -497,9 +500,14 @@
final ConnectionContext context =
producerExchange.getConnectionContext();
synchronized (sendLock) {
if (store != null && message.isPersistent()) {
- if (systemUsage.isSendFailIfNoSpace() &&
systemUsage.getStoreUsage().isFull()) {
- throw new javax.jms.ResourceAllocationException("Usage
Manager Store is Full");
- }
+ if (systemUsage.getStoreUsage().isFull()) {
+ final String logMessage = "Usage Manager Store is Full.
Stopping producer (" + message.getProducerId() + ") to prevent flooding " +
getActiveMQDestination().getQualifiedName() + "." +
+ " See
http://activemq.apache.org/producer-flow-control.html for more info";
+ LOG.info(logMessage);
+ if (systemUsage.isSendFailIfNoSpace()) {
+ throw new
javax.jms.ResourceAllocationException(logMessage);
+ }
+ }
while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException(
@@ -1278,6 +1286,14 @@
final void sendMessage(final ConnectionContext context, Message msg)
throws Exception {
if (!msg.isPersistent() && messages.getSystemUsage() != null) {
+ if (systemUsage.getTempUsage().isFull()) {
+ final String logMessage = "Usage Manager Temp Store is
Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " +
getActiveMQDestination().getQualifiedName() + "." +
+ " See
http://activemq.apache.org/producer-flow-control.html for more info";
+ LOG.info(logMessage);
+ if (systemUsage.isSendFailIfNoSpace()) {
+ throw new
javax.jms.ResourceAllocationException(logMessage);
+ }
+ }
messages.getSystemUsage().getTempUsage().waitForSpace();
}
synchronized(messages) {