Author: tabish
Date: Mon Mar 28 20:10:41 2011
New Revision: 1086378
URL: http://svn.apache.org/viewvc?rev=1086378&view=rev
Log:
Update the test case so that its not dependent on port 61616
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=1086378&r1=1086377&r2=1086378&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Mon Mar 28 20:10:41 2011
@@ -50,7 +50,7 @@ import static org.apache.activemq.TestSu
public class ExpiredMessagesTest extends CombinationTestSupport {
private static final Logger LOG =
LoggerFactory.getLogger(ExpiredMessagesTest.class);
-
+
BrokerService broker;
Connection connection;
Session session;
@@ -60,7 +60,8 @@ public class ExpiredMessagesTest extends
public ActiveMQDestination dlqDestination = new
ActiveMQQueue("ActiveMQ.DLQ");
public boolean useTextMessage = true;
public boolean useVMCursor = true;
-
+ protected String brokerUri;
+
public static Test suite() {
return suite(ExpiredMessagesTest.class);
}
@@ -68,78 +69,79 @@ public class ExpiredMessagesTest extends
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
-
- protected void setUp() throws Exception {
+
+ protected void setUp() throws Exception {
final boolean deleteAllMessages = true;
broker = createBroker(deleteAllMessages, 100);
+ brokerUri =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
-
- public void testExpiredMessages() throws Exception {
-
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
- connection = factory.createConnection();
- session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(destination);
- producer.setTimeToLive(100);
- consumer = session.createConsumer(destination);
- connection.start();
- final AtomicLong received = new AtomicLong();
-
- Thread consumerThread = new Thread("Consumer Thread") {
- public void run() {
- long start = System.currentTimeMillis();
- try {
- long end = System.currentTimeMillis();
- while (end - start < 3000) {
- if (consumer.receive(1000) !=
null) {
- received.incrementAndGet();
- }
- Thread.sleep(100);
- end =
System.currentTimeMillis();
- }
- consumer.close();
- } catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
-
+
+ public void testExpiredMessages() throws Exception {
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUri);
+ connection = factory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setTimeToLive(100);
+ consumer = session.createConsumer(destination);
+ connection.start();
+ final AtomicLong received = new AtomicLong();
+
+ Thread consumerThread = new Thread("Consumer Thread") {
+ public void run() {
+ long start = System.currentTimeMillis();
+ try {
+ long end = System.currentTimeMillis();
+ while (end - start < 3000) {
+ if (consumer.receive(1000) != null) {
+ received.incrementAndGet();
+ }
+ Thread.sleep(100);
+ end = System.currentTimeMillis();
+ }
+ consumer.close();
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
consumerThread.start();
-
- final int numMessagesToSend = 10000;
- Thread producingThread = new Thread("Producing Thread") {
+
+ final int numMessagesToSend = 10000;
+ Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
- int i = 0;
- while (i++ < numMessagesToSend) {
-
producer.send(session.createTextMessage("test"));
- }
- producer.close();
+ int i = 0;
+ while (i++ < numMessagesToSend) {
+ producer.send(session.createTextMessage("test"));
+ }
+ producer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
}
- };
-
- producingThread.start();
-
+ };
+
+ producingThread.start();
+
consumerThread.join();
producingThread.join();
session.close();
-
+
final DestinationStatistics view = getDestinationStatistics(broker,
destination);
// wait for all to inflight to expire
assertTrue("all inflight messages expired ", Wait.waitFor(new
Wait.Condition() {
public boolean isSatisified() throws Exception {
return view.getInflight().getCount() == 0;
- }
+ }
}));
assertEquals("Wrong inFlightCount: ", 0,
view.getInflight().getCount());
-
+
LOG.info("Stats: received: " + received.get() + ", enqueues: " +
view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatched().getCount() + ",
inflight: " + view.getInflight().getCount() + ", expiries: " +
view.getExpired().getCount());
-
+
// wait for all sent to get delivered and expire
assertTrue("all sent messages expired ", Wait.waitFor(new
Wait.Condition() {
public boolean isSatisified() throws Exception {
@@ -148,15 +150,15 @@ public class ExpiredMessagesTest extends
LOG.info("Stats: received: " + received.get() + ", size= " +
view.getMessages().getCount() + ", enqueues: " + view.getDequeues().getCount()
+ ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatched().getCount() +
", inflight: " + view.getInflight().getCount() + ", expiries: " +
view.getExpired().getCount());
return oldEnqueues == view.getEnqueues().getCount();
- }
+ }
}, 60*1000));
-
+
LOG.info("Stats: received: " + received.get() + ", size= " +
view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount()
+ ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatched().getCount() + ",
inflight: " + view.getInflight().getCount() + ", expiries: " +
view.getExpired().getCount());
-
+
assertTrue("got at least what did not expire", received.get() >=
view.getDequeues().getCount() - view.getExpired().getCount());
-
+
assertTrue("all messages expired - queue size gone to zero " +
view.getMessages().getCount(), Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
LOG.info("Stats: received: " + received.get() + ", size= " +
view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount()
+ ", dequeues: " + view.getDequeues().getCount()
@@ -164,61 +166,61 @@ public class ExpiredMessagesTest extends
return view.getMessages().getCount() == 0;
}
}));
-
+
final long expiredBeforeEnqueue = numMessagesToSend -
view.getEnqueues().getCount();
final long totalExpiredCount = view.getExpired().getCount() +
expiredBeforeEnqueue;
-
+
final DestinationStatistics dlqView = getDestinationStatistics(broker,
dlqDestination);
LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ",
enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " +
dlqView.getDequeues().getCount()
+ ", dispatched: " + dlqView.getDispatched().getCount() + ",
inflight: " + dlqView.getInflight().getCount() + ", expiries: " +
dlqView.getExpired().getCount());
-
+
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return totalExpiredCount == dlqView.getMessages().getCount();
}
});
assertEquals("dlq contains all expired", totalExpiredCount,
dlqView.getMessages().getCount());
-
+
// memory check
assertEquals("memory usage is back to duck egg", 0,
getDestination(broker, destination).getMemoryUsage().getPercentUsage());
- assertTrue("memory usage is increased ", 0 < getDestination(broker,
dlqDestination).getMemoryUsage().getPercentUsage());
-
+ assertTrue("memory usage is increased ", 0 < getDestination(broker,
dlqDestination).getMemoryUsage().getPercentUsage());
+
// verify DLQ
MessageConsumer dlqConsumer = createDlqConsumer(connection);
final DLQListener dlqListener = new DLQListener();
dlqConsumer.setMessageListener(dlqListener);
-
+
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return totalExpiredCount == dlqListener.count;
}
}, 60 * 1000);
-
+
assertEquals("dlq returned all expired", dlqListener.count,
totalExpiredCount);
- }
+ }
class DLQListener implements MessageListener {
-
+
int count = 0;
-
+
public void onMessage(Message message) {
count++;
}
-
+
};
-
- private MessageConsumer createDlqConsumer(Connection connection) throws
Exception {
- return connection.createSession(false,
Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
+
+ private MessageConsumer createDlqConsumer(Connection connection) throws
Exception {
+ return connection.createSession(false,
Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
}
public void initCombosForTestRecoverExpiredMessages() {
- addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE,
Boolean.FALSE});
- }
-
- public void testRecoverExpiredMessages() throws Exception {
+ addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE,
Boolean.FALSE});
+ }
+
+ public void testRecoverExpiredMessages() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
- "failover://tcp://localhost:61616");
+ "failover://"+brokerUri);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -247,7 +249,7 @@ public class ExpiredMessagesTest extends
producingThread.join();
DestinationStatistics view = getDestinationStatistics(broker,
destination);
- LOG.info("Stats: size: " + view.getMessages().getCount() + ",
enqueues: "
+ LOG.info("Stats: size: " + view.getMessages().getCount() + ",
enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: "
+ view.getDispatched().getCount() + ", inflight: "
@@ -263,7 +265,7 @@ public class ExpiredMessagesTest extends
LOG.info("recovering broker");
final boolean deleteAllMessages = false;
broker = createBroker(deleteAllMessages, 5000);
-
+
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
DestinationStatistics view = getDestinationStatistics(broker,
destination);
@@ -273,25 +275,25 @@ public class ExpiredMessagesTest extends
+ view.getDispatched().getCount() + ", inflight: "
+ view.getInflight().getCount() + ", expiries: "
+ view.getExpired().getCount());
-
+
return view.getMessages().getCount() == 0;
}
});
-
+
view = getDestinationStatistics(broker, destination);
assertEquals("Expect empty queue, QueueSize: ", 0,
view.getMessages().getCount());
assertEquals("all dequeues were expired",
view.getDequeues().getCount(), view.getExpired().getCount());
}
- private BrokerService createBroker(boolean deleteAllMessages, long
expireMessagesPeriod) throws Exception {
- BrokerService broker = new BrokerService();
+ private BrokerService createBroker(boolean deleteAllMessages, long
expireMessagesPeriod) throws Exception {
+ BrokerService broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setDestinations(new ActiveMQDestination[]{destination});
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setDirectory(new File("target/expiredtest-data/"));
adaptor.setForceRecoverReferenceStore(true);
broker.setPersistenceAdapter(adaptor);
-
+
PolicyEntry defaultPolicy = new PolicyEntry();
if (useVMCursor) {
defaultPolicy.setPendingQueuePolicy(new
VMPendingQueueMessageStoragePolicy());
@@ -302,17 +304,17 @@ public class ExpiredMessagesTest extends
policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
- broker.addConnector("tcp://localhost:61616");
+ broker.addConnector("tcp://localhost:0");
broker.start();
broker.waitUntilStarted();
return broker;
- }
-
-
-
- protected void tearDown() throws Exception {
- connection.stop();
- broker.stop();
- broker.waitUntilStopped();
- }
+ }
+
+
+
+ protected void tearDown() throws Exception {
+ connection.stop();
+ broker.stop();
+ broker.waitUntilStopped();
+ }
}