Author: gtully
Date: Fri Jan 30 15:30:24 2009
New Revision: 739307
URL: http://svn.apache.org/viewvc?rev=739307&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1593
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Jan 30 15:30:24 2009
@@ -316,7 +316,6 @@
inAckRange = true;
}
if (inAckRange) {
- node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
destination = node.getRegionDestination();
callDispatchMatched = true;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
Fri Jan 30 15:30:24 2009
@@ -70,9 +70,6 @@
public void doTestRedelivery(String brokerUrl, boolean interleaveProducer)
throws Exception {
- final int nbMessages = 10;
- final String destinationName = "Destination";
-
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
@@ -179,36 +176,66 @@
}
}
- public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
+ // AMQ-1593
+ public void testValidateRedeliveryCountOnRollback() throws Exception {
- ConnectionFactory connectionFactory =
+ final int numMessages = 1;
+ ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
- populateDestination(nbMessages, destinationName, connection);
+ populateDestination(numMessages, destinationName, connection);
{
AtomicInteger received = new AtomicInteger();
- Map<String, Boolean> rolledback = new ConcurrentHashMap<String,
Boolean>();
- while (received.get() < nbMessages) {
- Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ final int maxRetries = new
RedeliveryPolicy().getMaximumRedeliveries();
+ while (received.get() < maxRetries) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer =
session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(1000);
if (msg != null) {
- if (msg != null && rolledback.put(msg.getText(),
Boolean.TRUE) != null) {
- LOG.info("Received message " + msg.getText() + " (" +
received.getAndIncrement() + ")" + msg.getJMSMessageID());
- assertTrue(msg.getJMSRedelivered());
- session.commit();
- }
+ LOG.info("Received message " + msg.getText() + " (" +
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+ assertEquals("redelivery property matches deliveries",
received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+ session.rollback();
}
session.close();
}
}
}
+ // AMQ-1593
+ public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws
Exception {
+
+ final int numMessages = 1;
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl +
"?jms.prefetchPolicy.queuePrefetch=0");
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ populateDestination(numMessages, destinationName, connection);
+
+ {
+ AtomicInteger received = new AtomicInteger();
+ final int maxRetries = new
RedeliveryPolicy().getMaximumRedeliveries();
+ while (received.get() < maxRetries) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(destinationName);
+
+ MessageConsumer consumer =
session.createConsumer(destination);
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+ if (msg != null) {
+ LOG.info("Received message " + msg.getText() + " (" +
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+ assertEquals("redelivery property matches deliveries",
received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+ session.rollback();
+ }
+ session.close();
+ }
+ }
+ }
+
public void testRedeliveryPropertyWithNoRollback() throws Exception {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Fri Jan 30 15:30:24 2009
@@ -140,7 +140,7 @@
doSetUp();
remoteConsumer = remoteSession.createDurableSubscriber(included,
consumerName);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- assertNotNull(remoteConsumer.receive(500));
+ assertNotNull("message count: " + i, remoteConsumer.receive(1000));
}
}