Author: gtully
Date: Mon Apr 12 14:03:18 2010
New Revision: 933240
URL: http://svn.apache.org/viewvc?rev=933240&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2693 - loaded machine
with slow thread creation can delay interruption processing past next dispatch
which can be problematic. prefetch=1 will workaround
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=933240&r1=933239&r2=933240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Apr 12 14:03:18 2010
@@ -144,6 +144,7 @@ public class ActiveMQMessageConsumer imp
private ExecutorService executorService;
private MessageTransformer transformer;
private boolean clearDispatchList;
+ boolean inProgressClearRequiredFlag;
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
@@ -655,23 +656,32 @@ public class ActiveMQMessageConsumer imp
this.session.asyncSendPacket(removeCommand);
}
- void clearMessagesInProgress() {
+ void inProgressClearRequired() {
+ inProgressClearRequiredFlag = true;
// deal with delivered messages async to avoid lock contention with in
progress acks
clearDispatchList = true;
- synchronized (unconsumedMessages.getMutex()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getConsumerId() + " clearing dispatched list (" +
unconsumedMessages.size() + ") on transport interrupt");
- }
- // ensure unconsumed are rolledback up front as they may get
redelivered to another consumer
- List<MessageDispatch> list = unconsumedMessages.removeAll();
- if (!this.info.isBrowser()) {
- for (MessageDispatch old : list) {
- session.connection.rollbackDuplicate(this,
old.getMessage());
+ }
+
+ void clearMessagesInProgress() {
+ if (inProgressClearRequiredFlag) {
+ synchronized (unconsumedMessages.getMutex()) {
+ if (inProgressClearRequiredFlag) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getConsumerId() + " clearing dispatched list
(" + unconsumedMessages.size() + ") on transport interrupt");
+ }
+ // ensure unconsumed are rolledback up front as they may
get redelivered to another consumer
+ List<MessageDispatch> list =
unconsumedMessages.removeAll();
+ if (!this.info.isBrowser()) {
+ for (MessageDispatch old : list) {
+ session.connection.rollbackDuplicate(this,
old.getMessage());
+ }
+ }
+ // allow dispatch on this connection to resume
+
session.connection.transportInterruptionProcessingComplete();
+ inProgressClearRequiredFlag = false;
}
}
}
- // allow dispatch on this connection to resume
- session.connection.transportInterruptionProcessingComplete();
}
void deliverAcks() {
@@ -1192,6 +1202,7 @@ public class ActiveMQMessageConsumer imp
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
+ clearMessagesInProgress();
clearDispatchList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=933240&r1=933239&r2=933240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Mon Apr 12 14:03:18 2010
@@ -649,6 +649,7 @@ public class ActiveMQSession implements
// connection.transportInterruptionProcessingComplete()
//
for (final ActiveMQMessageConsumer consumer : consumers) {
+ consumer.inProgressClearRequired();
scheduler.executeAfterDelay(new Runnable() {
public void run() {
consumer.clearMessagesInProgress();