Author: gtully
Date: Fri Mar 13 11:59:08 2009
New Revision: 753214
URL: http://svn.apache.org/viewvc?rev=753214&view=rev
Log:
resolve AMQ-2102|https://issues.apache.org/activemq/browse/AMQ-2102 - refactor
message dispatch on slave to take account of subscription choice on the master,
this ensures slave is in sync w.r.t outstanding acks.
processDispatchNotification imoplemented by Queue type destinations which
delegates to subscription after doing a dispatch, test demonstrates slve out of
sync errors
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Fri Mar 13 11:59:08 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.ft;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
@@ -28,6 +30,7 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
@@ -58,8 +61,9 @@
private static final Log LOG = LogFactory.getLog(MasterBroker.class);
private Transport slave;
private AtomicBoolean started = new AtomicBoolean(false);
- private final Object addConsumerLock = new Object();
+ private Map<ConsumerId, ConsumerId> consumers = new
ConcurrentHashMap<ConsumerId, ConsumerId>();
+
/**
* Constructor
*
@@ -197,14 +201,19 @@
* @throws Exception
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo
info) throws Exception {
- // as master and slave do independent dispatch, the consumer add order
between master and slave
- // needs to be maintained
- synchronized (addConsumerLock) {
- sendSyncToSlave(info);
- return super.addConsumer(context, info);
- }
+ sendSyncToSlave(info);
+ consumers.put(info.getConsumerId(), info.getConsumerId());
+ return super.addConsumer(context, info);
}
+ @Override
+ public void removeConsumer(ConnectionContext context, ConsumerInfo info)
+ throws Exception {
+ super.removeConsumer(context, info);
+ consumers.remove(info.getConsumerId());
+ sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
+ }
+
/**
* remove a subscription
*
@@ -317,7 +326,9 @@
if (messageDispatch.getMessage() != null) {
Message msg = messageDispatch.getMessage();
mdn.setMessageId(msg.getMessageId());
- sendSyncToSlave(mdn);
+ if (consumers.containsKey(messageDispatch.getConsumerId())) {
+ sendSyncToSlave(mdn);
+ }
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Fri Mar 13 11:59:08 2009
@@ -418,6 +418,34 @@
Subscription sub =
subscriptions.get(messageDispatchNotification.getConsumerId());
if (sub != null) {
sub.processMessageDispatchNotification(messageDispatchNotification);
+ } else {
+ throw new JMSException("Slave broker out of sync with master -
Subscription: "
+ + messageDispatchNotification.getConsumerId()
+ + " on " + messageDispatchNotification.getDestination()
+ + " does not exist for dispatch of message: "
+ + messageDispatchNotification.getMessageId());
+ }
+ }
+
+ /*
+ * For a Queue/TempQueue, dispatch order is imperative to match acks, so
the dispatch is deferred till
+ * the notification to ensure that the subscription chosen by the master
is used. AMQ-2102
+ */
+ protected void
processDispatchNotificationViaDestination(MessageDispatchNotification
messageDispatchNotification) throws Exception {
+ Destination dest = null;
+ synchronized (destinationsMutex) {
+ dest =
destinations.get(messageDispatchNotification.getDestination());
+ }
+ if (dest != null) {
+ dest.processDispatchNotification(messageDispatchNotification);
+ } else {
+ throw new JMSException(
+ "Slave broker out of sync with master - Destination: "
+ + messageDispatchNotification.getDestination()
+ + " does not exist for consumer "
+ + messageDispatchNotification.getConsumerId()
+ + " with message: "
+ + messageDispatchNotification.getMessageId());
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Mar 13 11:59:08 2009
@@ -18,6 +18,8 @@
import java.io.IOException;
+import javax.jms.JMSException;
+
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
@@ -27,6 +29,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
@@ -485,4 +488,9 @@
}
}
}
+
+ public void processDispatchNotification(
+ MessageDispatchNotification messageDispatchNotification) throws
Exception {
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Fri Mar 13 11:59:08 2009
@@ -28,6 +28,7 @@
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
@@ -175,4 +176,12 @@
void isFull(ConnectionContext context,Usage usage);
List<Subscription> getConsumers();
+
+ /**
+ * called on Queues in slave mode to allow dispatch to follow subscription
choice of master
+ * @param messageDispatchNotification
+ * @throws Exception
+ */
+ void processDispatchNotification(
+ MessageDispatchNotification messageDispatchNotification) throws
Exception;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Fri Mar 13 11:59:08 2009
@@ -27,6 +27,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
@@ -259,4 +260,9 @@
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
+
+ public void processDispatchNotification(
+ MessageDispatchNotification messageDispatchNotification) throws
Exception {
+ next.processDispatchNotification(messageDispatchNotification);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar 13 11:59:08 2009
@@ -38,7 +38,6 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -48,15 +47,14 @@
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
-import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
@@ -65,7 +63,6 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
@@ -1001,7 +998,7 @@
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
- boolean pageInMoreMessages = false;
+ boolean pageInMoreMessages = false;
synchronized(iteratingMutex) {
BrowserDispatch rd;
while ((rd = getNextBrowserDispatch()) != null) {
@@ -1244,13 +1241,13 @@
// Only page in the minimum number of messages which can be
dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
toPageIn);
}
- if ((force || !consumers.isEmpty()) && toPageIn > 0) {
- messages.setMaxBatchSize(toPageIn);
+
+ if ((force || !consumers.isEmpty()) && toPageIn > 0) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) {
try {
-
+ messages.setMaxBatchSize(toPageIn);
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
@@ -1326,7 +1323,8 @@
List<Subscription> consumers;
synchronized (this.consumers) {
- if (this.consumers.isEmpty()) {
+ if (this.consumers.isEmpty() || isSlave()) {
+ // slave dispatch happens in processDispatchNotification
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
@@ -1422,4 +1420,104 @@
return total;
}
+ /*
+ * In slave mode, dispatch is ignored till we get this notification as the
dispatch
+ * process is non deterministic between master and slave.
+ * On a notification, the actual dispatch to the subscription (as chosen
by the master)
+ * is completed.
+ * (non-Javadoc)
+ * @see
org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+ */
+ public void processDispatchNotification(
+ MessageDispatchNotification messageDispatchNotification) throws
Exception {
+ // do dispatch
+ Subscription sub =
getMatchingSubscription(messageDispatchNotification);
+ if (sub != null) {
+ MessageReference message =
getMatchingMessage(messageDispatchNotification);
+ sub.add(message);
+
sub.processMessageDispatchNotification(messageDispatchNotification);
+ }
+ }
+
+ private QueueMessageReference
getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
throws Exception {
+ QueueMessageReference message = null;
+ MessageId messageId = messageDispatchNotification.getMessageId();
+
+ dispatchLock.lock();
+ try {
+ synchronized (pagedInPendingDispatch) {
+ for(QueueMessageReference ref : pagedInPendingDispatch) {
+ if (messageId.equals(ref.getMessageId())) {
+ message = ref;
+ pagedInPendingDispatch.remove(ref);
+ break;
+ }
+ }
+ }
+
+ if (message == null) {
+ synchronized (pagedInMessages) {
+ message = pagedInMessages.get(messageId);
+ }
+ }
+
+ if (message == null) {
+ synchronized (messages) {
+ try {
+ messages.setMaxBatchSize(getMaxPageSize());
+ messages.reset();
+ while (messages.hasNext()) {
+ MessageReference node = messages.next();
+ node.incrementReferenceCount();
+ messages.remove();
+ if (messageId.equals(node.getMessageId())) {
+ message =
this.createMessageReference(node.getMessage());
+ break;
+ }
+ }
+ } finally {
+ messages.release();
+ }
+ }
+ }
+
+ if (message == null) {
+ Message msg = loadMessage(messageId);
+ if (msg != null) {
+ message = this.createMessageReference(msg);
+ }
+ }
+
+ } finally {
+ dispatchLock.unlock();
+ }
+ if (message == null) {
+ throw new JMSException(
+ "Slave broker out of sync with master - Message: "
+ + messageDispatchNotification.getMessageId()
+ + " on " + messageDispatchNotification.getDestination()
+ + " does not exist among pending(" +
pagedInPendingDispatch.size() + ") for subscription: "
+ + messageDispatchNotification.getConsumerId());
+ }
+ return message;
+ }
+
+ /**
+ * Find a consumer that matches the id in the message dispatch notification
+ * @param messageDispatchNotification
+ * @return sub or null if the subscription has been removed before dispatch
+ * @throws JMSException
+ */
+ private Subscription getMatchingSubscription(MessageDispatchNotification
messageDispatchNotification) throws JMSException {
+ Subscription sub = null;
+ synchronized (consumers) {
+ for (Subscription s : consumers) {
+ if
(messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId()))
{
+ sub = s;
+ break;
+ }
+ }
+ }
+ return sub;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
Fri Mar 13 11:59:08 2009
@@ -24,6 +24,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
@@ -64,4 +65,15 @@
}
return inactiveDestinations;
}
+
+ /*
+ * For a Queue, dispatch order is imperative to match acks, so the
dispatch is deferred till
+ * the notification to ensure that the subscription chosen by the master
is used.
+ *
+ * (non-Javadoc)
+ * @see
org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+ */
+ public void processDispatchNotification(MessageDispatchNotification
messageDispatchNotification) throws Exception {
+ processDispatchNotificationViaDestination(messageDispatchNotification);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Fri Mar 13 11:59:08 2009
@@ -22,6 +22,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
@@ -72,4 +73,16 @@
super.removeDestination(context, destination, timeout);
}
+
+ /*
+ * For a Queue, dispatch order is imperative to match acks, so the
dispatch is deferred till
+ * the notification to ensure that the subscription chosen by the master
is used.
+ *
+ * (non-Javadoc)
+ * @see
org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+ */
+ public void processDispatchNotification(MessageDispatchNotification
messageDispatchNotification) throws Exception {
+ processDispatchNotificationViaDestination(messageDispatchNotification);
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
Fri Mar 13 11:59:08 2009
@@ -108,11 +108,8 @@
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
- // REVISIT the following two are not dependable at the moment, off by
a small number
- // for some reason? The work for a COUNT < ~500
- //
- //assertEquals("inflight match",
rb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
- //assertEquals("enqueues match",
rb.getDestinationStatistics().getEnqueues().getCount(),
masterRb.getDestinationStatistics().getEnqueues().getCount());
+ assertEquals("inflight match",
rb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
+ assertEquals("enqueues match",
rb.getDestinationStatistics().getEnqueues().getCount(),
masterRb.getDestinationStatistics().getEnqueues().getCount());
assertEquals("dequeues match",
rb.getDestinationStatistics().getDequeues().getCount(),
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
Fri Mar 13 11:59:08 2009
@@ -47,14 +47,15 @@
import org.apache.commons.logging.LogFactory;
public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
+
+ final static int MESSAGE_COUNT = 12120;
+ final static int NUM_CONSUMERS = 10;
+ final static int CONSUME_ALL = -1;
- final static int MESSAGE_COUNT = 5120;
- final static int NUM_CONSUMERS = 20;
-
private static final Log LOG = LogFactory.getLog(AMQ2102Test.class);
- private final Map<Thread, Throwable> exceptions = new
ConcurrentHashMap<Thread, Throwable>();
+ private final static Map<Thread, Throwable> exceptions = new
ConcurrentHashMap<Thread, Throwable>();
private class Consumer implements Runnable, ExceptionListener {
private ActiveMQConnectionFactory connectionFactory;
@@ -63,12 +64,14 @@
private boolean running;
private org.omg.CORBA.IntHolder startup;
private Thread thread;
+ private int numToProcessPerIteration;
- Consumer(ActiveMQConnectionFactory connectionFactory, String
queueName, org.omg.CORBA.IntHolder startup, int id) {
+ Consumer(ActiveMQConnectionFactory connectionFactory, String
queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) {
this.connectionFactory = connectionFactory;
this.queueName = queueName;
this.startup = startup;
name = "Consumer-" + queueName + "-" + id;
+ numToProcessPerIteration = numToProcess;
thread = new Thread(this, name);
}
@@ -93,6 +96,7 @@
}
public void onException(JMSException e) {
+ exceptions.put(Thread.currentThread(), e);
error("JMS exception: ", e);
}
@@ -146,7 +150,13 @@
Session session = null;
try {
session = connection.createSession(true,
Session.SESSION_TRANSACTED);
- processMessages(session);
+ if (numToProcessPerIteration > 0) {
+ while(isRunning()) {
+ processMessages(session);
+ }
+ } else {
+ processMessages(session);
+ }
} finally {
if (session != null) {
session.close();
@@ -189,7 +199,8 @@
}
startup = null;
}
- while (isRunning()) {
+ int numToProcess = numToProcessPerIteration;
+ do {
Message message = consumer.receive(5000);
if (message != null) {
@@ -201,7 +212,7 @@
session.rollback();
}
}
- }
+ } while ((numToProcess == CONSUME_ALL || --numToProcess > 0) &&
isRunning());
}
public void run() {
@@ -224,7 +235,7 @@
}
}
- private class Producer {
+ private class Producer implements ExceptionListener {
private ActiveMQConnectionFactory connectionFactory;
private String queueName;
@@ -246,6 +257,7 @@
try {
connection = (ActiveMQConnection)
connectionFactory.createConnection();
+ connection.setExceptionListener(this);
connection.start();
sendMessages(connection);
@@ -302,6 +314,7 @@
sendMessages(session, replyQueue, consumer);
} finally {
consumer.close();
+ session.commit();
}
}
@@ -326,9 +339,8 @@
}
}
- private void sendMessages(Session session, Destination replyQueue,
MessageConsumer consumer) throws JMSException {
+ private void sendMessages(final Session session, Destination
replyQueue, MessageConsumer consumer) throws JMSException {
final org.omg.CORBA.IntHolder messageCount = new
org.omg.CORBA.IntHolder(MESSAGE_COUNT);
-
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message reply) {
if (reply instanceof TextMessage) {
@@ -340,6 +352,15 @@
error("Problem processing reply", e);
}
messageCount.value--;
+ if (messageCount.value % 200 == 0) {
+ // ack a bunch of replys
+ info("acking via session commit:
messageCount=" + messageCount.value);
+ try {
+ session.commit();
+ } catch (JMSException e) {
+ error("Failed to commit with count: " +
messageCount.value, e);
+ }
+ }
messageCount.notify();
}
} else {
@@ -354,11 +375,7 @@
synchronized (messageCount) {
while (messageCount.value > 0) {
- if (messageCount.value % 100 == 0) {
- // ack a bunch of replys
- debug("acking via session commit: messageCount=" +
messageCount.value);
- session.commit();
- }
+
try {
messageCount.wait();
} catch (InterruptedException e) {
@@ -370,12 +387,21 @@
session.commit();
debug("All replies received...");
}
+
+ public void onException(JMSException exception) {
+ LOG.error(exception);
+ exceptions.put(Thread.currentThread(), exception);
+ }
}
private static void debug(String message) {
LOG.debug(message);
}
+ private static void info(String message) {
+ LOG.info(message);
+ }
+
private static void error(String message) {
LOG.error(message);
}
@@ -384,15 +410,17 @@
t.printStackTrace();
String msg = message + ": " + (t.getMessage() != null ? t.getMessage()
: t.toString());
LOG.error(msg, t);
+ exceptions.put(Thread.currentThread(), t);
fail(msg);
}
- private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory
connectionFactory, String queueName, int max) {
+ private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory
connectionFactory, String queueName,
+ int max, int numToProcessPerConsumer) {
ArrayList<Consumer> consumers = new ArrayList<Consumer>(max);
org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max);
for (int id = 0; id < max; id++) {
- consumers.add(new Consumer(connectionFactory, queueName, startup,
id));
+ consumers.add(new Consumer(connectionFactory, queueName, startup,
id, numToProcessPerConsumer));
}
for (Consumer consumer : consumers) {
consumer.start();
@@ -445,6 +473,7 @@
public void tearDown() throws Exception {
master.stop();
slave.stop();
+ exceptions.clear();
}
public void testMasterSlaveBug() throws Exception {
@@ -453,7 +482,7 @@
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("failover:(" +
masterUrl + ")?randomize=false");
String queueName = "MasterSlaveBug";
- ArrayList<Consumer> consumers = createConsumers(connectionFactory,
queueName, NUM_CONSUMERS);
+ ArrayList<Consumer> consumers = createConsumers(connectionFactory,
queueName, NUM_CONSUMERS, CONSUME_ALL);
Producer producer = new Producer(connectionFactory, queueName);
producer.execute(new String[]{});
@@ -468,10 +497,31 @@
assertTrue(exceptions.isEmpty());
}
+
+ public void testMasterSlaveBugWithStopStartConsumers() throws Exception {
+
+ Thread.setDefaultUncaughtExceptionHandler(this);
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
+ "failover:(" + masterUrl + ")?randomize=false");
+ String queueName = "MasterSlaveBug";
+ ArrayList<Consumer> consumers = createConsumers(connectionFactory,
+ queueName, NUM_CONSUMERS, 10);
+
+ Producer producer = new Producer(connectionFactory, queueName);
+ producer.execute(new String[] {});
+
+ for (Consumer consumer : consumers) {
+ consumer.setRunning(false);
+ }
+
+ for (Consumer consumer : consumers) {
+ consumer.join();
+ }
+ assertTrue(exceptions.isEmpty());
+ }
+
public void uncaughtException(Thread t, Throwable e) {
error("" + t + e);
exceptions.put(t,e);
-
-
}
}