Author: chirino
Date: Wed Sep 24 07:46:00 2008
New Revision: 698595
URL: http://svn.apache.org/viewvc?rev=698595&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1951
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Sep 24 07:46:00 2008
@@ -125,6 +125,8 @@
private MessageTransformer transformer;
private boolean clearDispatchList;
+ private MessageAck pendingAck;
+
/**
* Create a MessageConsumer
*
@@ -615,6 +617,8 @@
ackCounter = 0;
}
}
+ } else {
+ ack = pendingAck;
}
if (ack != null) {
final MessageAck ackToSend = ack;
@@ -835,13 +839,19 @@
// The delivered message list is only needed for the recover method
// which is only used with client ack.
deliveredCounter++;
+
+ MessageAck oldPendingAck = pendingAck;
+ pendingAck = new MessageAck(md, ackType, deliveredCounter);
+ if( oldPendingAck==null ) {
+ pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+ } else {
+ pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+ }
+
pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter -
additionalWindowSize)) {
- MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
- if( !deliveredMessages.isEmpty() ) {
-
ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
- }
-
ack.setTransactionId(session.getTransactionContext().getTransactionId());
- session.sendAck(ack);
+ session.sendAck(pendingAck);
+ pendingAck=null;
additionalWindowSize = deliveredCounter;
// When using DUPS ok, we do a real ack.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Sep 24 07:46:00 2008
@@ -216,18 +216,8 @@
throws Exception {
synchronized(dispatchLock) {
dequeueCounter++;
- node
- .getRegionDestination()
-
.getDestinationStatistics()
- .getDequeues()
- .increment();
-
- node
- .getRegionDestination()
-
.getDestinationStatistics()
- .getInflight()
- .decrement();
-
+
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
prefetchExtension--;
}
}
@@ -236,6 +226,9 @@
// Need to put it back in the
front.
synchronized(dispatchLock) {
dispatched.add(0, node);
+ // ActiveMQ workaround for
AMQ-1730 - Please Ignore next line
+
node.incrementRedeliveryCounter();
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
}
});
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=698595&r1=698594&r2=698595&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Wed Sep 24 07:46:00 2008
@@ -509,22 +509,21 @@
sendMessages(session, destination, 2);
session.commit();
- // Only pick up the first message.
- Message message1 = consumer.receive(1000);
- assertNotNull(message1);
-
- // Don't acknowledge yet. This should keep our prefetch full.
+ // The prefetch should fill up with 1 message.
// Since prefetch is still full, the 2nd message should get dispatched
- // to
- // another consumer.. lets create the 2nd consumer test that it does
+ // to another consumer.. lets create the 2nd consumer test that it does
// make sure it does.
ActiveMQConnection connection2 =
(ActiveMQConnection)factory.createConnection();
connections.add(connection2);
Session session2 = connection2.createSession(true, 0);
- session2.createConsumer(destination);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+
+ // Pick up the first message.
+ Message message1 = consumer.receive(1000);
+ assertNotNull(message1);
- // Only pick up the 2nd messages.
- Message message2 = consumer.receive(1000);
+ // Pick up the 2nd messages.
+ Message message2 = consumer2.receive(1000);
assertNotNull(message2);
session.commit();