Author: gtully
Date: Tue Jan 13 04:10:22 2009
New Revision: 734114
URL: http://svn.apache.org/viewvc?rev=734114&view=rev
Log:
add test case to exercise producerFlowControl sendFailIfNoSpace with queue
memory limit
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java?rev=734114&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
Tue Jan 13 04:10:22 2009
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(1);
+ policy.setPendingSubscriberPolicy(new
VMPendingSubscriberMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ service.getSystemUsage().setSendFailIfNoSpace(true);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ @Override
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws
Exception {
+ // with sendFailIfNoSpace set, there is no blocking of the connection
+ }
+
+ @Override
+ public void testPubisherRecoverAfterBlock() throws Exception {
+ ActiveMQConnectionFactory factory =
(ActiveMQConnectionFactory)createConnectionFactory();
+ // with sendFail, there must be no flowControllwindow
+ // sendFail is an alternative flow control mechanism that does not
block
+ factory.setUseAsyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ Thread thread = new Thread("Filler") {
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ try {
+ producer.send(session.createTextMessage("Test
message"));
+ if (gotResourceException.get()) {
+ // do not flood the broker with requests when full
as we are sending async and they
+ // will be limited by the network buffers
+ Thread.sleep(200);
+ }
+ } catch (Exception e) {
+ // with async send, there will be no exceptions
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+ // resourceException on second message, resumption if we
+ // can receive 10
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 10; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ msg.acknowledge();
+ }
+ keepGoing.set(false);
+ }
+
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connector.getConnectUri());
+ connectionFactory.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException arg0) {
+ if (arg0 instanceof
ResourceAllocationException) {
+ gotResourceException.set(true);
+ }
+ }
+ });
+ return connectionFactory;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?rev=734114&r1=734113&r2=734114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Tue Jan 13 04:10:22 2009
@@ -42,8 +42,10 @@
ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
- private TransportConnector connector;
- private ActiveMQConnection connection;
+ protected TransportConnector connector;
+ protected ActiveMQConnection connection;
+ // used to test sendFailIfNoSpace on SystemUsage
+ protected final AtomicBoolean gotResourceException = new
AtomicBoolean(false);
public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked()
throws Exception {
ActiveMQConnectionFactory factory =
(ActiveMQConnectionFactory)createConnectionFactory();
@@ -89,6 +91,8 @@
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+
Thread thread = new Thread("Filler") {
@Override
public void run() {
@@ -102,14 +106,7 @@
}
};
thread.start();
- while (true) {
- Thread.sleep(1000);
- // the producer is blocked once the done flag stays true.
- if (done.get()) {
- break;
- }
- done.set(true);
- }
+ waitForBlockedOrResourceLimit(done);
// after receiveing messges, producer should continue sending messages
// (done == false)
@@ -124,6 +121,7 @@
assertFalse(done.get());
}
+
public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws
Exception {
ActiveMQConnectionFactory factory =
(ActiveMQConnectionFactory)createConnectionFactory();
factory.setAlwaysSyncSend(true);
@@ -224,15 +222,20 @@
}
}.start();
+ waitForBlockedOrResourceLimit(done);
+ keepGoing.set(false);
+ }
+
+ protected void waitForBlockedOrResourceLimit(final AtomicBoolean done)
+ throws InterruptedException {
while (true) {
Thread.sleep(1000);
- // the producer is blocked once the done flag stays true.
- if (done.get()) {
+ // the producer is blocked once the done flag stays true or there
is a resource exception
+ if (done.get() || gotResourceException.get()) {
break;
}
done.set(true);
}
- keepGoing.set(false);
}
private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String
message) throws JMSException {
@@ -274,10 +277,12 @@
}
protected void tearDown() throws Exception {
- TcpTransport t =
(TcpTransport)connection.getTransport().narrow(TcpTransport.class);
- t.getTransportListener().onException(new IOException("Disposed."));
- connection.getTransport().stop();
- super.tearDown();
+ if (connection != null) {
+ TcpTransport t =
(TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+ t.getTransportListener().onException(new IOException("Disposed."));
+ connection.getTransport().stop();
+ super.tearDown();
+ }
}
protected ConnectionFactory createConnectionFactory() throws Exception {