Author: jstrachan
Date: Wed Dec 28 07:33:05 2005
New Revision: 359547
URL: http://svn.apache.org/viewcvs?rev=359547&view=rev
Log:
minor refactor to provide a hook when dispatching messages which have no
consumers; so that we can for example, send them to a dead letter queue
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
Wed Dec 28 07:33:05 2005
@@ -273,17 +273,30 @@
if (! subscriptionRecoveryPolicy.add(context, message)) {
return;
}
- if (consumers.isEmpty())
+ if (consumers.isEmpty()) {
+ onMessageWithNoConsumers(context, message);
return;
+ }
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
- dispatchPolicy.dispatch(context, message, msgContext, consumers);
+ if (!dispatchPolicy.dispatch(context, message, msgContext,
consumers)) {
+ onMessageWithNoConsumers(context, message);
+ }
}
finally {
msgContext.clear();
dispatchValve.decrement();
+ }
+ }
+
+ /**
+ * Provides a hook to allow messages with no consumer to be processed in
some way - such as to send to a dead letter queue or something..
+ */
+ protected void onMessageWithNoConsumers(ConnectionContext context, Message
message) {
+ if (! message.isPersistent()) {
+ // allow messages with no consumers to be dispatched to a dead
letter queue
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/DispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -41,7 +41,9 @@
* large pre-fetch may take all the messages if he is always dispatched to
first.
* Once a message has been locked, it does not need to be dispatched to
any
* further subscriptions.
+ *
+ * @return true if at least one consumer was dispatched or false if there
are no active subscriptions that could be dispatched
*/
- void dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable;
+ boolean dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -37,12 +37,13 @@
private final Object mutex = new Object();
- public void dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable {
+ public boolean dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable {
// Big synch here so that only 1 message gets dispatched at a time.
Ensures
// Everyone sees the same order and that the consumer list is not used
while
// it's being rotated.
synchronized(mutex) {
+ int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@@ -52,6 +53,7 @@
continue;
sub.add(node);
+ count++;
}
// Rotate the consumer list.
@@ -59,6 +61,7 @@
consumers.add(consumers.remove(0));
} catch (Throwable bestEffort) {
}
+ return count > 0;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SimpleDispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -35,8 +35,8 @@
*/
public class SimpleDispatchPolicy implements DispatchPolicy {
- public void dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable {
-
+ public boolean dispatch(ConnectionContext context, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable {
+ int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@@ -48,7 +48,9 @@
continue;
sub.add(node);
+ count++;
}
+ return count > 0;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java?rev=359547&r1=359546&r2=359547&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
Wed Dec 28 07:33:05 2005
@@ -36,11 +36,11 @@
int i=0;
private final Object mutex = new Object();
- public void dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable {
-
+ public boolean dispatch(ConnectionContext newParam, MessageReference node,
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws
Throwable {
// Big synch here so that only 1 message gets dispatched at a time.
Ensures
// Everyone sees the same order.
synchronized(mutex) {
+ int count = 0;
i++;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@@ -50,7 +50,9 @@
continue;
sub.add(node);
+ count++;
}
+ return count > 0;
}
}