Author: gtully
Date: Fri Jan 30 11:57:06 2009
New Revision: 739249
URL: http://svn.apache.org/viewvc?rev=739249&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-1730 - add some more
tests and remove workaround for prefetch=0, relates to
https://issues.apache.org/activemq/browse/AMQ-2087
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Jan 30 11:57:06 2009
@@ -230,8 +230,6 @@
public void afterRollback() throws
Exception {
synchronized(dispatchLock) {
- // ActiveMQ workaround for
AMQ-1730 - Please Ignore next line
-
node.incrementRedeliveryCounter();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Fri Jan 30 11:57:06 2009
@@ -630,10 +630,11 @@
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
destination = createDestination(session,
ActiveMQDestination.QUEUE_TYPE);
- sendMessages(connection, destination, 1);
+ sendMessages(connection, destination, 2);
MessageConsumer consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(1000));
+ assertNotNull(consumer.receive(1000));
// install another consumer while message dispatch is
unacked/uncommitted
Session redispatchSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
@@ -645,8 +646,12 @@
Message msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
assertTrue(msg.getJMSRedelivered());
- // should have re-delivery of 2, one for re-dispatch, one for rollback
which is a little too much!
- assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+
+ msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
redispatchSession.commit();
assertNull(redispatchConsumer.receive(500));
@@ -660,10 +665,11 @@
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
destination = createDestination(session,
ActiveMQDestination.QUEUE_TYPE);
- sendMessages(connection, destination, 1);
+ sendMessages(connection, destination, 2);
MessageConsumer consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(1000));
+ assertNotNull(consumer.receive(1000));
// install another consumer while message dispatch is
unacked/uncommitted
Session redispatchSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
@@ -675,8 +681,11 @@
Message msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
assertTrue(msg.getJMSRedelivered());
- // should have re-delivery of 2, one for re-dispatch, one for rollback
which is a little too much!
- assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
redispatchSession.commit();
assertNull(redispatchConsumer.receive(500));
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
Fri Jan 30 11:57:06 2009
@@ -38,6 +38,7 @@
protected static final Log LOG =
LogFactory.getLog(JmsRollbackRedeliveryTest.class);
final int nbMessages = 10;
final String destinationName = "Destination";
+ final String brokerUrl = "vm://localhost?create=false";
boolean consumerClose = true;
boolean rollback = true;
@@ -52,13 +53,21 @@
public void testRedelivery() throws Exception {
- doTestRedelivery("vm://localhost", false);
+ doTestRedelivery(brokerUrl, false);
}
public void testRedeliveryWithInterleavedProducer() throws Exception {
- doTestRedelivery("vm://localhost", true);
+ doTestRedelivery(brokerUrl, true);
}
+ public void testRedeliveryWithPrefetch0() throws Exception {
+ doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0",
true);
+ }
+
+ public void testRedeliveryWithPrefetch1() throws Exception {
+ doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1",
true);
+ }
+
public void doTestRedelivery(String brokerUrl, boolean interleaveProducer)
throws Exception {
final int nbMessages = 10;
@@ -88,9 +97,11 @@
if (msg != null && rolledback.put(msg.getText(),
Boolean.TRUE) != null) {
LOG.info("Received message " + msg.getText() + " (" +
received.getAndIncrement() + ")" + msg.getJMSMessageID());
assertTrue(msg.getJMSRedelivered());
+ assertEquals(2,
msg.getLongProperty("JMSXDeliveryCount"));
session.commit();
} else {
LOG.info("Rollback message " + msg.getText() + " id: "
+ msg.getJMSMessageID());
+ assertFalse(msg.getJMSRedelivered());
session.rollback();
}
}
@@ -102,7 +113,8 @@
public void testRedeliveryOnSingleConsumer() throws Exception {
- ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
@@ -135,7 +147,8 @@
public void testRedeliveryOnSingleSession() throws Exception {
- ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
@@ -168,7 +181,8 @@
public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
- ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
@@ -195,6 +209,37 @@
}
}
+ public void testRedeliveryPropertyWithNoRollback() throws Exception {
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl);
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ populateDestination(nbMessages, destinationName, connection);
+ connection.close();
+
+ {
+ AtomicInteger received = new AtomicInteger();
+ while (received.get() < nbMessages) {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(destinationName);
+
+ MessageConsumer consumer =
session.createConsumer(destination);
+ TextMessage msg = (TextMessage) consumer.receive(2000);
+ if (msg != null) {
+ LOG.info("Received message " + msg.getText() +
+ " (" + received.getAndIncrement() + ")" +
msg.getJMSMessageID());
+ assertFalse(msg.getJMSRedelivered());
+ assertEquals(1, msg.getLongProperty("JMSXDeliveryCount"));
+ }
+ session.close();
+ connection.close();
+ }
+ }
+ }
+
private void populateDestination(final int nbMessages,
final String destinationName, Connection connection)
throws JMSException {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
Fri Jan 30 11:57:06 2009
@@ -55,7 +55,7 @@
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
- assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+ assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
}
public void
testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws
Throwable {
@@ -75,7 +75,7 @@
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
- assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+ assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
}
public void
testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws
Throwable {
@@ -94,7 +94,7 @@
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
- assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+ assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
}
protected void setUp() throws Exception {