Author: gtully
Date: Wed Feb 15 11:21:43 2012
New Revision: 1244443
URL: http://svn.apache.org/viewvc?rev=1244443&view=rev
Log:
fix intermittent failure, consume messages from all destinations, sorts out the
composite case so that there are no dangling messages
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
activemq/trunk/activemq-core/src/test/resources/log4j.properties
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1244443&r1=1244442&r2=1244443&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Wed Feb 15 11:21:43 2012
@@ -519,21 +519,25 @@ public class XARecoveryBrokerTest extend
connection.send(message);
}
- // Setup the consumer and receive the message.
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
- connection.send(consumerInfo);
-
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
- Message m = null;
- for (int i = 0; i < 4; i++) {
- m = receiveMessage(connection);
- assertNotNull(m);
+
+ Message message = null;
+ for (ActiveMQDestination dest : destinationList(destination)) {
+ // Setup the consumer and receive the message.
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
+ connection.send(consumerInfo);
+
+ for (int i = 0; i < 4; i++) {
+ message = receiveMessage(connection);
+ assertNotNull(message);
+ }
+ MessageAck ack = createAck(consumerInfo, message, 4,
MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
}
- MessageAck ack = createAck(consumerInfo, m, 4,
MessageAck.STANDARD_ACK_TYPE);
- ack.setTransactionId(txid);
- connection.send(ack);
+
// Don't commit
// restart the broker.
@@ -545,13 +549,16 @@ public class XARecoveryBrokerTest extend
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
- consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
- // All messages should be re-delivered.
- for (int i = 0; i < 4; i++) {
- m = receiveMessage(connection);
- assertNotNull(m);
+ for (ActiveMQDestination dest : destinationList(destination)) {
+ // Setup the consumer and receive the message.
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
+ connection.send(consumerInfo);
+
+ for (int i = 0; i < 4; i++) {
+ message = receiveMessage(connection);
+ assertNotNull(message);
+ }
}
assertNoMessagesLeft(connection);
Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1244443&r1=1244442&r2=1244443&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Wed Feb 15
11:21:43 2012
@@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
-log4j.logger.org.apache.activemq.transport.failover=TRACE
+#log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG