Author: chirino
Date: Tue Apr 1 06:22:48 2008
New Revision: 643390
URL: http://svn.apache.org/viewvc?rev=643390&view=rev
Log:
When pulling a message, iterate the destinations first to make sure that it has
pushed all available messages to
the sub. This should fix the ZeroPrefetchTest that was intermitently failing
on slower machines.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Tue Apr 1 06:22:48 2008
@@ -27,12 +27,13 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.Task;
import org.apache.activemq.usage.MemoryUsage;
/**
* @version $Revision: 1.12 $
*/
-public interface Destination extends Service {
+public interface Destination extends Service, Task {
void addSubscription(ConnectionContext context, Subscription sub) throws
Exception;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Tue Apr 1 06:22:48 2008
@@ -206,4 +206,8 @@
public void messageExpired(ConnectionContext context, PrefetchSubscription
prefetchSubscription, MessageReference node) {
next.messageExpired(context, prefetchSubscription, node);
}
+
+ public boolean iterate() {
+ return next.iterate();
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Apr 1 06:22:48 2008
@@ -79,32 +79,43 @@
/**
* Allows a message to be pulled on demand by a client
*/
- public synchronized Response pullMessage(ConnectionContext context,
MessagePull pull) throws Exception {
+ public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
// The slave should not deliver pull messages. TODO: when the slave
// becomes a master,
// He should send a NULL message to all the consumers to 'wake them up'
// in case
// they were waiting for a message.
if (getPrefetchSize() == 0 && !isSlave()) {
- prefetchExtension++;
- final long dispatchCounterBeforePull = dispatchCounter;
- dispatchPending();
- // If there was nothing dispatched.. we may need to setup a
timeout.
- if (dispatchCounterBeforePull == dispatchCounter) {
- // imediate timeout used by receiveNoWait()
- if (pull.getTimeout() == -1) {
- // Send a NULL message.
- add(QueueMessageReference.NULL_MESSAGE);
- dispatchPending();
- }
- if (pull.getTimeout() > 0) {
- Scheduler.executeAfterDelay(new Runnable() {
-
- public void run() {
- pullTimeout(dispatchCounterBeforePull);
- }
- }, pull.getTimeout());
- }
+ final long dispatchCounterBeforePull;
+ synchronized(this) {
+ prefetchExtension++;
+ dispatchCounterBeforePull = dispatchCounter;
+ }
+
+ // Have the destination push us some messages.
+ for (Destination dest : destinations) {
+ dest.iterate();
+ }
+ dispatchPending();
+
+ synchronized(this) {
+ // If there was nothing dispatched.. we may need to setup a
timeout.
+ if (dispatchCounterBeforePull == dispatchCounter) {
+ // imediate timeout used by receiveNoWait()
+ if (pull.getTimeout() == -1) {
+ // Send a NULL message.
+ add(QueueMessageReference.NULL_MESSAGE);
+ dispatchPending();
+ }
+ if (pull.getTimeout() > 0) {
+ Scheduler.executeAfterDelay(new Runnable() {
+
+ public void run() {
+ pullTimeout(dispatchCounterBeforePull);
+ }
+ }, pull.getTimeout());
+ }
+ }
}
}
return null;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Apr 1 06:22:48 2008
@@ -99,6 +99,7 @@
wakeup();
}
};
+ private final Object iteratingMutex = new Object() {};
private static final Comparator<Subscription>orderedCompare = new
Comparator<Subscription>() {
@@ -914,51 +915,52 @@
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
-
- RecoveryDispatch rd;
- while ((rd = getNextRecoveryDispatch()) != null) {
- try {
- MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
- msgContext.setDestination(destination);
-
- for (QueueMessageReference node : rd.messages) {
- if (!node.isDropped() && !node.isAcked() &&
(!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
- msgContext.setMessageReference(node);
- if (rd.subscription.matches(node, msgContext)) {
- rd.subscription.add(node);
- }
- }
- }
-
- if( rd.subscription instanceof QueueBrowserSubscription ) {
-
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- boolean result = false;
- synchronized (messages) {
- result = !messages.isEmpty();
- }
-
- if (result) {
- try {
- pageInMessages(false);
-
- } catch (Throwable e) {
- log.error("Failed to page in more queue messages ", e);
- }
- }
- synchronized(messagesWaitingForSpace) {
- while (!messagesWaitingForSpace.isEmpty() &&
!memoryUsage.isFull()) {
- Runnable op = messagesWaitingForSpace.removeFirst();
- op.run();
- }
+ synchronized(iteratingMutex) {
+ RecoveryDispatch rd;
+ while ((rd = getNextRecoveryDispatch()) != null) {
+ try {
+ MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
+ msgContext.setDestination(destination);
+
+ for (QueueMessageReference node : rd.messages) {
+ if (!node.isDropped() && !node.isAcked() &&
(!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
+ msgContext.setMessageReference(node);
+ if (rd.subscription.matches(node,
msgContext)) {
+ rd.subscription.add(node);
+ }
+ }
+ }
+
+ if( rd.subscription instanceof QueueBrowserSubscription
) {
+
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ boolean result = false;
+ synchronized (messages) {
+ result = !messages.isEmpty();
+ }
+
+ if (result) {
+ try {
+ pageInMessages(false);
+
+ } catch (Throwable e) {
+ log.error("Failed to page in more queue messages ", e);
+ }
+ }
+ synchronized(messagesWaitingForSpace) {
+ while (!messagesWaitingForSpace.isEmpty() &&
!memoryUsage.isFull()) {
+ Runnable op = messagesWaitingForSpace.removeFirst();
+ op.run();
+ }
+ }
+ return false;
}
- return false;
}
protected MessageReferenceFilter createMessageIdFilter(final String
messageId) {