Author: tabish
Date: Fri Sep 23 13:13:33 2011
New Revision: 1174734
URL: http://svn.apache.org/viewvc?rev=1174734&view=rev
Log:
Remove the dependency on the fixed 61616 port number
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java?rev=1174734&r1=1174733&r2=1174734&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
Fri Sep 23 13:13:33 2011
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
/**
* This is a test case for the issue reported at:
- * https://issues.apache.org/activemq/browse/AMQ-2021
+ * https://issues.apache.org/activemq/browse/AMQ-2021
* Bug is modification of inflight message properties so the failure can
manifest itself in a bunch
* or ways, from message receipt with null properties to marshall errors
*/
@@ -53,25 +53,26 @@ public class AMQ2021Test extends TestCas
BrokerService brokerService;
ArrayList<Thread> threads = new ArrayList<Thread>();
Vector<Throwable> exceptions;
-
+
AMQ2021Test testCase;
-
- String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
- String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND +
"?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
-
- private int numMessages = 1000;
- private int numConsumers = 2;
- private int dlqMessages = numMessages/2;
-
- CountDownLatch receivedLatch;
+
+ private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
+ private String CONSUMER_BROKER_URL =
"?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
+ private String PRODUCER_BROKER_URL;
+
+ private final int numMessages = 1000;
+ private final int numConsumers = 2;
+ private final int dlqMessages = numMessages/2;
+
+ private CountDownLatch receivedLatch;
private ActiveMQTopic destination;
- public CountDownLatch started;
+ private CountDownLatch started;
@Override
protected void setUp() throws Exception {
Thread.setDefaultUncaughtExceptionHandler(this);
testCase = this;
-
+
// Start an embedded broker up.
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
@@ -79,18 +80,21 @@ public class AMQ2021Test extends TestCas
brokerService.start();
destination = new ActiveMQTopic(getName());
exceptions = new Vector<Throwable>();
-
- receivedLatch =
+
+ CONSUMER_BROKER_URL =
brokerService.getTransportConnectors().get(0).getPublishableConnectString() +
CONSUMER_BROKER_URL;
+ PRODUCER_BROKER_URL =
brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+ receivedLatch =
new CountDownLatch(numConsumers * (numMessages + dlqMessages));
started = new CountDownLatch(1);
}
-
+
@Override
protected void tearDown() throws Exception {
for (Thread t : threads) {
t.interrupt();
t.join();
- }
+ }
brokerService.stop();
}
@@ -101,9 +105,9 @@ public class AMQ2021Test extends TestCas
threads.add(c1);
c1.start();
}
-
+
assertTrue(started.await(10, TimeUnit.SECONDS));
-
+
Thread producer = new Thread() {
@Override
public void run() {
@@ -115,34 +119,34 @@ public class AMQ2021Test extends TestCas
};
threads.add(producer);
producer.start();
-
- boolean allGood = receivedLatch.await(30, TimeUnit.SECONDS);
+
+ boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
for (Throwable t: exceptions) {
log.error("failing test with first exception", t);
fail("exception during test : " + t);
- }
+ }
assertTrue("excepted messages received within time limit", allGood);
-
+
assertEquals(0, exceptions.size());
-
+
for (int i=0; i<numConsumers; i++) {
// last recovery sends message to deq so is not received again
assertEquals(dlqMessages*2,
((ConsumerThread)threads.get(i)).recoveries);
assertEquals(numMessages + dlqMessages,
((ConsumerThread)threads.get(i)).counter);
}
-
+
// half of the messages for each consumer should go to the dlq but
duplicates will
// be suppressed
consumeFromDLQ(dlqMessages);
-
- }
-
+
+ }
+
private void consumeFromDLQ( int messageCount) throws Exception {
- ActiveMQConnectionFactory connectionFactory =
- new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
-
+
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer dlqConsumer = session.createConsumer(new
ActiveMQQueue("ActiveMQ.DLQ"));
int count = 0;
@@ -158,19 +162,19 @@ public class AMQ2021Test extends TestCas
public void produce(int count) throws Exception {
Connection connection=null;
try {
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
connection = factory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(0);
connection.start();
-
+
for (int i=0 ; i< count; i++) {
int id = i+1;
TextMessage message = session.createTextMessage(getName()+"
Message "+ id);
message.setIntProperty("MsgNumber", id);
producer.send(message);
-
+
if (id % 500 == 0) {
log.info("sent " + id + ", ith " + message);
}
@@ -187,7 +191,7 @@ public class AMQ2021Test extends TestCas
}
}
}
-
+
public class ConsumerThread extends Thread implements MessageListener {
public long counter = 0;
public long recoveries = 0;
@@ -199,24 +203,24 @@ public class AMQ2021Test extends TestCas
public void run() {
try {
- ActiveMQConnectionFactory connectionFactory =
- new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(testCase);
- connection.setClientID(getName());
+ connection.setClientID(getName());
session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer =
session.createDurableSubscriber(destination, getName());
+ MessageConsumer consumer =
session.createDurableSubscriber(destination, getName());
consumer.setMessageListener(this);
connection.start();
-
+
started .countDown();
-
+
} catch (JMSException exception) {
log.error("unexpected ex in consumer run", exception);
exceptions.add(exception);
}
}
-
+
public void onMessage(Message message) {
try {
counter++;
@@ -226,8 +230,8 @@ public class AMQ2021Test extends TestCas
recoveries++;
} else {
message.acknowledge();
- }
-
+ }
+
if (counter % 200 == 0) {
log.info("recoveries:" + recoveries + ", Received " +
counter + ", counter'th " + message);
}
@@ -237,7 +241,7 @@ public class AMQ2021Test extends TestCas
exceptions.add(e);
}
}
-
+
}
public void onException(JMSException exception) {