Author: gtully
Date: Mon Nov 23 18:46:46 2009
New Revision: 883459
URL: http://svn.apache.org/viewvc?rev=883459&view=rev
Log:
svn merge -c 883458 - resolve
https://issues.apache.org/activemq/browse/AMQ-2489 - duplicate delivery acks
resulted in broker exceptions with client or inividual ack - delivery acks now
only for unacked messages - down side is pending messages in broker remain on
expiry awaiting ack from ackLaer that dependes on prefetch value - but this is
reasonable and to be expected. they will be removed on close or subsequent acks
in any event
Added:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
- copied unchanged from r883458,
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=883459&r1=883458&r2=883459&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Nov 23 18:46:46 2009
@@ -831,7 +831,13 @@
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if
(session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ boolean messageUnackedByConsumer = false;
+ synchronized (deliveredMessages) {
+ messageUnackedByConsumer = deliveredMessages.contains(md);
+ }
+ if (messageUnackedByConsumer) {
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ }
}
else {
throw new IllegalStateException("Invalid session state.");
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=883459&r1=883458&r2=883459&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Mon Nov 23 18:46:46 2009
@@ -159,7 +159,8 @@
// first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker();
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
+ final long queuePrefetch = 600;
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch="
+ queuePrefetch);
connection = factory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
@@ -222,7 +223,7 @@
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new
Wait.Condition() {
public boolean isSatisified() throws Exception {
- return 1000 == view.getDispatchCount();
+ return queuePrefetch == view.getDispatchCount();
}
}));
assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition()
{
@@ -240,17 +241,29 @@
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
- return 0 == view.getInFlightCount();
+ // consumer ackLater(delivery ack for expired messages) is
based on half the prefetch value
+ // which will leave half of the prefetch pending till consumer
close
+ return (queuePrefetch/2) -1 == view.getInFlightCount();
}
});
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" +
view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " +
view.getExpiredCount()
+ ", size= " + view.getQueueSize());
- assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount());
+
+
+ assertEquals("inflight reduces to half prefetch minus single delivered
message", (queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues match sent/expired ", sendCount,
view.getDequeueCount());
consumer.close();
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return 0 == view.getInFlightCount();
+ }
+ });
+ assertEquals("inflight goes to zeor on close", 0,
view.getInFlightCount());
+
LOG.info("done: " + getName());
}