Author: chirino
Date: Fri Feb 23 12:22:24 2007
New Revision: 511078
URL: http://svn.apache.org/viewvc?view=rev&rev=511078
Log:
[EMAIL PROTECTED]: chirino | 2007-02-23 14:47:41 -0500
When a message send blocks on a destination level usage manager, it blocks all
publishers on the same connection even publishers that are publishing to
destinations who's limits have not been reached. In some scenarios, this can
result in a deadlock since it prevents publishing to a destination that could
otherwise receive messages.
This patch delays sending the repsone to sync publishers until the destination
usage allows the message to be sent but does not block on the send. This
allows other producers on the same connection to get serviced but flow controls
the producers on full destinations by delaying the send response.
In order to take advantage of this new producer flow control which avoid the
described deadlock, sync sends must be used. To force sync sends for all send
requests, a new 'useSyncSend' option should be set to true on the
ActiveMQConnectionFactory.
Hopefully a future version this patch will be developed that provides the same
feaure but works with async sends and a producer ack to flow control the
producer.
Added:
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Modified:
activemq/branches/activemq-4.1/ (props changed)
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
svk:merge = 635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:234
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Feb 23 12:22:24 2007
@@ -128,6 +128,7 @@
private boolean optimizeAcknowledge = false;
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
+ private boolean useSyncSend=false;
private int closeTimeout = 15000;
private final Transport transport;
@@ -1903,5 +1904,11 @@
}
+ public boolean isUseSyncSend() {
+ return useSyncSend;
+ }
+ public void setUseSyncSend(boolean forceSyncSend) {
+ this.useSyncSend = forceSyncSend;
+ }
}
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Fri Feb 23 12:22:24 2007
@@ -85,6 +85,8 @@
private int closeTimeout = 15000;
private boolean useRetroactiveConsumer;
private boolean nestedMapAndListEnabled = true;
+ private boolean useSyncSend=false;
+
JMSStatsImpl factoryStats = new JMSStatsImpl();
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new
ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@@ -256,6 +258,7 @@
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setRedeliveryPolicy(getRedeliveryPolicy());
+ connection.setUseSyncSend(isUseSyncSend());
transport.start();
@@ -507,10 +510,13 @@
props.setProperty("password", getPassword());
}
+
+ props.setProperty("useSyncSend", Boolean.toString(isUseSyncSend()));
props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
props.setProperty("useCompression",
Boolean.toString(isUseCompression()));
props.setProperty("useRetroactiveConsumer",
Boolean.toString(isUseRetroactiveConsumer()));
-
+
+
if (getUserName() != null) {
props.setProperty("userName", getUserName());
}
@@ -678,4 +684,12 @@
public void setStatsEnabled(boolean statsEnabled){
this.factoryStats.setEnabled(statsEnabled);
}
+
+ public boolean isUseSyncSend() {
+ return useSyncSend;
+ }
+
+ public void setUseSyncSend(boolean forceSyncSend) {
+ this.useSyncSend = forceSyncSend;
+ }
}
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri Feb 23 12:22:24 2007
@@ -1544,7 +1544,7 @@
log.debug("Sending message: " + msg);
}
- if(!msg.isPersistent() || connection.isUseAsyncSend() || txid!=null) {
+ if( !connection.isUseSyncSend() && ( !msg.isPersistent() ||
connection.isUseAsyncSend() || txid!=null) ) {
this.connection.asyncSendPacket(msg);
} else {
this.connection.syncSendPacket(msg);
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Fri Feb 23 12:22:24 2007
@@ -54,6 +54,7 @@
private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
+ private boolean dontSendReponse;
private final MessageEvaluationContext messageEvaluationContext = new
MessageEvaluationContext();
@@ -244,6 +245,14 @@
public int decrementReference() {
return referenceCounter.decrementAndGet();
+ }
+
+ public boolean isDontSendReponse() {
+ return dontSendReponse;
+ }
+
+ public void setDontSendReponse(boolean dontSendReponse) {
+ this.dontSendReponse = dontSendReponse;
}
}
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:22:24 2007
@@ -124,6 +124,7 @@
protected final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch = new CountDownLatch(1);
protected final AtomicBoolean asyncException = new AtomicBoolean(false);
+ private ConnectionContext context;
static class ConnectionState extends
org.apache.activemq.state.ConnectionState {
private final ConnectionContext context;
@@ -299,6 +300,18 @@
response = new Response();
}
response.setCorrelationId(commandId);
+ if( context!=null && context.isDontSendReponse() ) {
+ // No need to send back a response at this time.
+ } else {
+ if( response == null ) {
+ response = new Response();
+ }
+ response.setCorrelationId(commandId);
+ }
+ if( context!=null ) {
+ context.setDontSendReponse(false);
+ context=null;
+ }
}
return response;
@@ -461,7 +474,7 @@
ProducerId producerId = messageSend.getProducerId();
ConnectionState state = lookupConnectionState(producerId);
- ConnectionContext context = state.getContext();
+ context = state.getContext();
// If the message originates from this client connection,
// then, finde the associated producer state so we can do some dup
detection.
@@ -671,7 +684,7 @@
// Setup the context.
String clientId = info.getClientId();
- ConnectionContext context = new ConnectionContext();
+ context = new ConnectionContext();
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Feb 23 12:22:24 2007
@@ -33,10 +33,12 @@
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser;
@@ -51,6 +53,12 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.ArrayList;
@@ -267,19 +275,56 @@
}
}
+
+ static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 10, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue());
public void send(final ConnectionContext context, final Message message)
throws Exception {
if (context.isProducerFlowControl()) {
- if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
- throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
- }
- else {
- usageManager.waitForSpace();
- }
+ if( message.isResponseRequired() ) {
+ if( usageManager.isFull() ) {
+// System.out.println("Registering callback...");
+ Runnable callback = new Runnable() {
+ public void run() {
+// System.out.println("Callback triggering async
thread..");
+ threadPool.execute(new Runnable() {
+ public void run() {
+ try {
+// System.out.println("Async
thread start..");
+ sendMessage(context, message);
+ Response response = new
Response();
+
response.setCorrelationId(message.getCommandId());
+
context.getConnection().dispatchAsync(response);
+ } catch
(Exception e) {
+ ExceptionResponse
response = new ExceptionResponse(e);
+
response.setCorrelationId(message.getCommandId());
+
context.getConnection().dispatchAsync(response);
+ } finally {
+// System.out.println("Async
thread end..");
+ }
+ }
+ });
+ }
+ };
+ if( usageManager.notifyCallbackWhenNotFull(callback) ) {
+ context.setDontSendReponse(true);
+ return;
+ }
+ }
+ } else {
+ if (usageManager.isSendFailIfNoSpace() ) {
+ throw new javax.jms.ResourceAllocationException("Usage
Manager memory limit reached");
+ } else {
+ usageManager.waitForSpace();
+ }
+ }
}
- message.setRegionDestination(this);
+ sendMessage(context, message);
+ }
+
+ private void sendMessage(final ConnectionContext context, final Message
message) throws IOException, Exception {
+ message.setRegionDestination(this);
if (store != null && message.isPersistent())
store.addMessage(context, message);
@@ -301,7 +346,7 @@
finally {
node.decrementReferenceCount();
}
- }
+ }
public void dispose(ConnectionContext context) throws IOException {
if (store != null) {
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Fri Feb 23 12:22:24 2007
@@ -18,6 +18,7 @@
package org.apache.activemq.memory;
import java.util.Iterator;
+import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@
private final Object usageMutex = new Object();
private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
+ private final LinkedList callbacks = new LinkedList();
private boolean sendFailIfNoSpace;
@@ -92,6 +94,38 @@
}
}
}
+
+ /**
+ * @param callback
+ * @return true if the UsageManager was full. The callback will only be
called if this method returns true.
+ */
+ public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
+
+ if(parent!=null) {
+ Runnable r = new Runnable(){
+ public void run() {
+ synchronized (usageMutex) {
+ if( percentUsage >= 100 ) {
+ callbacks.add(callback);
+ } else {
+ callback.run();
+ }
+ }
+ }
+ };
+ if( parent.notifyCallbackWhenNotFull(r) ) {
+ return true;
+ }
+ }
+ synchronized (usageMutex) {
+ if( percentUsage >= 100 ) {
+ callbacks.add(callback);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
/**
* Increases the usage by the value amount.
@@ -247,6 +281,11 @@
if( oldPercentUsage >= 100 && newPercentUsage < 100 ) {
synchronized (usageMutex) {
usageMutex.notifyAll();
+ for (Iterator iter = callbacks.iterator(); iter.hasNext();) {
+ Runnable callback = (Runnable)
iter.next();
+ callback.run();
+ }
+ callbacks.clear();
}
}
Added:
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=auto&rev=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
(added)
+++
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
Fri Feb 23 12:22:24 2007
@@ -0,0 +1,439 @@
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQDeadlockTest3 extends TestCase {
+
+ private static final String URL1 = "tcp://localhost:61616";
+
+ private static final String URL2 = "tcp://localhost:61617";
+
+ private static final String QUEUE1_NAME = "test.queue.1";
+
+ private static final String QUEUE2_NAME = "test.queue.2";
+
+ private static final int MAX_CONSUMERS = 1;
+
+ private static final int MAX_PRODUCERS = 1;
+
+ private static final int NUM_MESSAGE_TO_SEND = 10;
+
+ private AtomicInteger messageCount = new AtomicInteger();
+ private CountDownLatch doneLatch;
+
+ public void setUp() throws Exception {
+ }
+
+ public void tearDown() throws Exception {
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithOneBrokerSameConnection() throws
Exception {
+
+ BrokerService brokerService1 = null;
+ ActiveMQConnectionFactory acf = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1,
null);
+ brokerService1.start();
+
+ acf = createConnectionFactory(URL1);
+ pcf = new PooledConnectionFactory(acf);
+
+ // Only listen on the first queue.. let the 2nd queue
fill up.
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf,
new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ Thread.sleep(2000);
+
+ final ExecutorService executor =
Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf,
QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf,
QUEUE1_NAME));
+ }
+
+ // Wait for all message to arrive.
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(NUM_MESSAGE_TO_SEND,
messageCount.get());
+
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+ brokerService1.stop();
+ brokerService1 = null;
+
+ }
+
+ }
+
+
+
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void
testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
+ throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1,
URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2,
URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ pcf = new PooledConnectionFactory(acf1);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(MAX_PRODUCERS *
NUM_MESSAGE_TO_SEND);
+ container1 =
createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500),
QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ final ExecutorService executor =
Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf,
QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf,
QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
+ messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void
testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
+ throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ DefaultMessageListenerContainer container1 = null;
+ DefaultMessageListenerContainer container2 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1,
URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2,
URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ Thread.sleep(1000);
+
+ doneLatch = new
CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
+
+ container1 =
createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500),
QUEUE1_NAME);
+ container1.afterPropertiesSet();
+ container2 =
createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000),
QUEUE2_NAME);
+ container2.afterPropertiesSet();
+
+ final ExecutorService executor =
Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new NonPooledProducerTask(acf1,
QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new NonPooledProducerTask(acf1,
QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS *
NUM_MESSAGE_TO_SEND, messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ container2.stop();
+ container2.destroy();
+ container2 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+
+
+
+ private BrokerService createBrokerService(final String brokerName,
+ final String uri1, final String uri2) throws Exception {
+ final BrokerService brokerService = new BrokerService();
+
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+
+ final UsageManager memoryManager = new UsageManager();
+ memoryManager.setLimit(5000000);
+ brokerService.setMemoryManager(memoryManager);
+
+ final ArrayList policyEntries = new ArrayList();
+
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ // entry.setQueue(QUEUE1_NAME);
+ entry.setMemoryLimit(1000);
+ policyEntries.add(entry);
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setBrokerName(brokerName);
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+
+ if (uri2 != null) {
+ final NetworkConnector nc = new
DiscoveryNetworkConnector(new URI("static:" + uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ nc.setName(brokerName + ".nc");
+ brokerService.addNetworkConnector(nc);
+ }
+
+ return brokerService;
+
+ }
+
+ public DefaultMessageListenerContainer
createDefaultMessageListenerContainer(
+ final ConnectionFactory acf, final MessageListener
listener,
+ final String queue) {
+ final DefaultMessageListenerContainer container = new
DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String
url) {
+ final ActiveMQConnectionFactory acf = new
ActiveMQConnectionFactory(url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setUseSyncSend(true);
+ return acf;
+ }
+
+ private class TestMessageListener1 implements MessageListener {
+
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime) {
+ this.waitTime = waitTime;
+
+ }
+
+ public void onMessage(Message msg) {
+
+ try {
+ System.out.println("Listener1 Consumed message
"+ msg.getIntProperty("count"));
+
+ messageCount.incrementAndGet();
+ doneLatch.countDown();
+
+ Thread.sleep(waitTime);
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+
+ private class PooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final PooledConnectionFactory pcf;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf,
+ final String queueName) {
+ this.pcf = pcf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new
JmsTemplate(pcf);
+
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new
MessageCreator() {
+
+ public Message
createMessage(Session session)
+ throws
JMSException {
+
+ final BytesMessage
message = session.createBytesMessage();
+
+
message.writeBytes(bytes);
+
message.setIntProperty("count", count.incrementAndGet());
+
message.setStringProperty("producer", "pooled");
+ return message;
+ }
+ });
+
+ System.out.println("PooledProducer sent
message: "+ count.get());
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ System.err.println("Producer 1 is exiting.");
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ private class NonPooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final ConnectionFactory cf;
+
+ public NonPooledProducerTask(final ConnectionFactory cf,
+ final String queueName) {
+ this.cf = cf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new
JmsTemplate(cf);
+
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new
MessageCreator() {
+
+ public Message
createMessage(Session session)
+ throws
JMSException {
+
+ final BytesMessage
message = session
+
.createBytesMessage();
+
+
message.writeBytes(bytes);
+
message.setIntProperty("count", count
+
.incrementAndGet());
+
message.setStringProperty("producer", "non-pooled");
+ return message;
+ }
+ });
+
+ System.out.println("Non-PooledProducer
sent message: " + count.get());
+
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ System.err.println("Producer 1 is exiting.");
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
Added:
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=auto&rev=511078
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(added)
+++
activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Fri Feb 23 12:22:24 2007
@@ -0,0 +1,161 @@
+package org.apache.activemq;
+
+import java.io.IOException;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+
+ ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+ ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+ private TransportConnector connector;
+ private ActiveMQConnection connection;
+
+ public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws
Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
createConnectionFactory();
+ factory.setUseSyncSend(true);
+ connection = (ActiveMQConnection) factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ // Test sending to Queue A
+ // 1st send should not block.
+ fillQueue(queueA);
+
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueB);
+
+ // Test sending to Queue B it should block.
+ // Since even though the it's queue limits have not been reached, the
connection
+ // is blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+
+ TextMessage msg = (TextMessage) consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+ assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+
+ msg = (TextMessage) consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws
Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ connection = (ActiveMQConnection) factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ // Test sending to Queue A
+ // 1st send should not block.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should block.
+ // Since even though the it's queue limits have not been reached, the
connection
+ // is blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+ }
+
+
+ private void fillQueue(final ActiveMQQueue queue) throws JMSException,
InterruptedException {
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ // Starts an async thread that every time it publishes it sets
the done flag to false.
+ // Once the send starts to block it will not reset the done
flag anymore.
+ new Thread("Fill thread.") {
+ public void run() {
+ Session session=null;
+ try {
+ session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(queue);
+
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ while( keepGoing.get() ) {
+ done.set(false);
+
producer.send(session.createTextMessage("Hello World"));
+ }
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+
+ while( true ) {
+ Thread.sleep(1000);
+ // the producer is blocked once the done flag stays
true.
+ if( done.get() )
+ break;
+ done.set(true);
+ }
+ keepGoing.set(false);
+ }
+
+ private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final
String message) throws JMSException {
+ final CountDownLatch done = new CountDownLatch(1);
+ new Thread("Send thread.") {
+ public void run() {
+ Session session=null;
+ try {
+ session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(queue);
+
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
producer.send(session.createTextMessage(message));
+ done.countDown();
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+ return done;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(1);
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ protected void tearDown() throws Exception {
+ TcpTransport t = (TcpTransport)
connection.getTransport().narrow(TcpTransport.class);
+ t.getTransportListener().onException(new IOException("Disposed."));
+ connection.getTransport().stop();
+ super.tearDown();
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(connector.getConnectUri());
+ }
+}