Author: gtully
Date: Thu Feb 19 18:15:59 2009
New Revision: 745953
URL: http://svn.apache.org/viewvc?rev=745953&view=rev
Log:
tidy up redispatch logic a little more, resolve: AMQ-2128, deliver acks on
dispose in auto_ack mode. also get some closure on: MQ-2075
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Feb 19 18:15:59 2009
@@ -630,7 +630,7 @@
void deliverAcks() {
MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
- if (this.optimizeAcknowledge) {
+ if (session.isAutoAcknowledge()) {
synchronized(deliveredMessages) {
ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
@@ -775,14 +775,12 @@
if (session.getTransacted()) {
// Do nothing.
} else if (session.isAutoAcknowledge()) {
- synchronized (deliveredMessages) {
- if (!deliveredMessages.isEmpty()) {
- if (optimizeAcknowledge) {
- if (deliveryingAcknowledgements.compareAndSet(
- false, true)) {
+ if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+ synchronized (deliveredMessages) {
+ if (!deliveredMessages.isEmpty()) {
+ if (optimizeAcknowledge) {
ackCounter++;
- if (ackCounter >= (info
- .getCurrentPrefetchSize() * .65)) {
+ if (ackCounter >=
(info.getCurrentPrefetchSize() * .65)) {
MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
@@ -790,16 +788,16 @@
session.sendAck(ack);
}
}
- deliveryingAcknowledgements.set(false);
- }
- } else {
- MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
- if (ack!=null) {
- deliveredMessages.clear();
- session.sendAck(ack);
+ } else {
+ MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack!=null) {
+ deliveredMessages.clear();
+ session.sendAck(ack);
+ }
}
}
}
+ deliveryingAcknowledgements.set(false);
}
} else if (session.isDupsOkAcknowledge()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Feb 19 18:15:59 2009
@@ -336,8 +336,7 @@
}
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups = getMessageGroupOwners()
- .removeConsumer(consumerId);
+ getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
List<QueueMessageReference> list = new
ArrayList<QueueMessageReference>();
@@ -353,19 +352,10 @@
list.add(qmr);
}
- if (!list.isEmpty() && !consumers.isEmpty()) {
+ if (!list.isEmpty()) {
doDispatch(list);
}
}
- //if it is a last consumer (and not a browser) dispatch all
pagedIn messages
- if (consumers.isEmpty() && !(sub instanceof
QueueBrowserSubscription)) {
- List<QueueMessageReference> list = new
ArrayList<QueueMessageReference>();
- for (QueueMessageReference ref :
pagedInMessages.values()) {
- list.add(ref);
- }
- pagedInPendingDispatch.clear();
- doDispatch(list);
- }
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
@@ -1068,7 +1058,7 @@
}
synchronized (messages) {
- pageInMoreMessages = !messages.isEmpty();
+ pageInMoreMessages |= !messages.isEmpty();
}
// Kinda ugly.. but I think dispatchLock is the only mutex
protecting the
@@ -1333,14 +1323,18 @@
* were not full.
*/
private List<QueueMessageReference>
doActualDispatch(List<QueueMessageReference> list) throws Exception {
- List<QueueMessageReference> rc = new
ArrayList<QueueMessageReference>(list.size());
- Set<Subscription> fullConsumers = new
HashSet<Subscription>(this.consumers.size());
List<Subscription> consumers;
synchronized (this.consumers) {
+ if (this.consumers.isEmpty()) {
+ return list;
+ }
consumers = new ArrayList<Subscription>(this.consumers);
}
+ List<QueueMessageReference> rc = new
ArrayList<QueueMessageReference>(list.size());
+ Set<Subscription> fullConsumers = new
HashSet<Subscription>(this.consumers.size());
+
for (MessageReference node : list) {
Subscription target = null;
int interestCount=0;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Thu Feb 19 18:15:59 2009
@@ -358,15 +358,99 @@
assertEquals(4, counter.get());
}
- public void
initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
+ public void
initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue()
{
addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("ackMode", new Object[]
{Integer.valueOf(Session.AUTO_ACKNOWLEDGE),
Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
-
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("ackMode", new Object[]
{Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
}
- public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws
Exception {
+ public void
testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws
Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch sendDone = new CountDownLatch(1);
+ final CountDownLatch got2Done = new CountDownLatch(1);
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ // This test case does not work if optimized message dispatch is used
as
+ // the main thread send block until the consumer receives the
+ // message. This test depends on thread decoupling so that the main
+ // thread can stop the consumer thread.
+ connection.setOptimizedMessageDispatch(false);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in first listener: " + tm.getText());
+ assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 2) {
+ sendDone.await();
+ connection.close();
+ got2Done.countDown();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+ sendDone.countDown();
+ // Wait for first 2 messages to arrive.
+ assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+ // Re-start connection.
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Pickup the remaining messages.
+ final CountDownLatch done2 = new CountDownLatch(1);
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in second listener: " + tm.getText());
+ // order is not guaranteed as the connection is started
before the listener is set.
+ // assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done2.countDown();
+ }
+ } catch (Throwable e) {
+ LOG.error("unexpected ex onMessage: ", e);
+ }
+ }
+ });
+
+ assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // assert msg 2 was redelivered as close() from onMessages() will only
ack in auto_ack mode
+ assertEquals(5, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[]
{Integer.valueOf(Session.AUTO_ACKNOWLEDGE),
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+ }
+
+ public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws
Exception {
+
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch sendDone = new CountDownLatch(1);
final CountDownLatch got2Done = new CountDownLatch(1);
@@ -426,13 +510,12 @@
try {
TextMessage tm = (TextMessage)m;
LOG.info("Got in second listener: " + tm.getText());
- assertEquals("" + counter.get(), tm.getText());
counter.incrementAndGet();
if (counter.get() == 4) {
done2.countDown();
}
} catch (Throwable e) {
- LOG.info("unexpected ex onMessage: ", e);
+ LOG.error("unexpected ex onMessage: ", e);
}
}
});
@@ -440,9 +523,9 @@
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
Thread.sleep(200);
+ // close from onMessage with Auto_ack will ack
// Make sure only 4 messages were delivered.
assertEquals(4, counter.get());
-
}
public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {