Author: jstrachan
Date: Thu Dec 29 06:12:46 2005
New Revision: 359800
URL: http://svn.apache.org/viewcvs?rev=359800&view=rev
Log:
enable by default that non-persistent topic messages which have no consumers
are sent to a dead letter topic
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=359800&r1=359799&r2=359800&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Dec 29 06:12:46 2005
@@ -29,6 +29,10 @@
public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new
ActiveMQTopic(ADVISORY_TOPIC_PREFIX+"TempTopic");
public static final String PRODUCER_ADVISORY_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX+"Producer.";
public static final String CONSUMER_ADVISORY_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX+"Consumer.";
+ public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX+"Expired.Topic.";
+ public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
+ public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
+ public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final ActiveMQTopic
TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new
ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
@@ -43,6 +47,26 @@
public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination
destination) {
String name =
PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getQualifiedName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic
getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
+ String name =
EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic
getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
+ String name =
EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic
getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
+ String name =
NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic
getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
+ String name =
NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
return new ActiveMQTopic(name);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=359800&r1=359799&r2=359800&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Dec 29 06:12:46 2005
@@ -18,12 +18,14 @@
import java.io.IOException;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import
org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
@@ -57,9 +59,10 @@
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new
LastImageSubscriptionRecoveryPolicy();
+ private boolean sendAdvisoryIfNoConsumers = true;
- public Topic(ActiveMQDestination destination, TopicMessageStore store,
UsageManager memoryManager,
- DestinationStatistics parentStats, TaskRunnerFactory taskFactory) {
+ public Topic(ActiveMQDestination destination, TopicMessageStore store,
UsageManager memoryManager, DestinationStatistics parentStats,
+ TaskRunnerFactory taskFactory) {
this.destination = destination;
this.store = store;
@@ -166,33 +169,34 @@
public void send(final ConnectionContext context, final Message message)
throws Throwable {
- if( context.isProducerFlowControl() )
+ if (context.isProducerFlowControl())
usageManager.waitForSpace();
-
+
message.setRegionDestination(this);
-
+
if (store != null && message.isPersistent())
store.addMessage(context, message);
message.incrementReferenceCount();
try {
-
+
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new
Synchronization() {
public void afterCommit() throws Throwable {
dispatch(context, message);
}
});
-
+
}
else {
dispatch(context, message);
}
-
- } finally {
+
+ }
+ finally {
message.decrementReferenceCount();
}
-
+
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey
key) throws IOException {
@@ -236,7 +240,7 @@
// Properties
//
-------------------------------------------------------------------------
-
+
public UsageManager getUsageManager() {
return usageManager;
}
@@ -265,12 +269,26 @@
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
}
+ public boolean isSendAdvisoryIfNoConsumers() {
+ return sendAdvisoryIfNoConsumers;
+ }
+
+ public void setSendAdvisoryIfNoConsumers(boolean
sendAdvisoryIfNoConsumers) {
+ this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+ }
+
+ public MessageStore getMessageStore() {
+ return store;
+ }
+
+ // Implementation methods
+ //
-------------------------------------------------------------------------
protected void dispatch(ConnectionContext context, Message message) throws
Throwable {
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
try {
- if (! subscriptionRecoveryPolicy.add(context, message)) {
+ if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
}
if (consumers.isEmpty()) {
@@ -280,7 +298,7 @@
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
-
+
if (!dispatchPolicy.dispatch(context, message, msgContext,
consumers)) {
onMessageWithNoConsumers(context, message);
}
@@ -291,17 +309,23 @@
}
}
- /**
- * Provides a hook to allow messages with no consumer to be processed in
some way - such as to send to a dead letter queue or something..
+ /**
+ * Provides a hook to allow messages with no consumer to be processed in
+ * some way - such as to send to a dead letter queue or something..
*/
- protected void onMessageWithNoConsumers(ConnectionContext context, Message
message) {
- if (! message.isPersistent()) {
- // allow messages with no consumers to be dispatched to a dead
letter queue
+ protected void onMessageWithNoConsumers(ConnectionContext context, Message
message) throws Throwable {
+ if (!message.isPersistent()) {
+ if (sendAdvisoryIfNoConsumers) {
+ // allow messages with no consumers to be dispatched to a dead
+ // letter queue
+ ActiveMQDestination originalDestination =
message.getDestination();
+ if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
+ ActiveMQTopic advisoryTopic =
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic(originalDestination);
+ message.setDestination(advisoryTopic);
+ context.getBroker().send(context, message);
+ }
+ }
}
- }
-
- public MessageStore getMessageStore() {
- return store;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=359800&r1=359799&r2=359800&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Dec 29 06:12:46 2005
@@ -23,7 +23,7 @@
/**
* Represents an entry in a [EMAIL PROTECTED] PolicyMap} for assigning
policies to a
- * specific destination or a hierarchial wildcard area of destinations.
+ * specific destination or a hierarchical wildcard area of destinations.
*
* @org.xbean.XBean
*
@@ -34,6 +34,7 @@
private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private RedeliveryPolicy redeliveryPolicy;
+ private boolean sendAdvisoryIfNoConsumers = true;
public void configure(Queue queue) {
if (dispatchPolicy != null) {
@@ -48,6 +49,7 @@
if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
}
+ topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
}
// Properties
@@ -76,4 +78,15 @@
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
}
+ public boolean isSendAdvisoryIfNoConsumers() {
+ return sendAdvisoryIfNoConsumers;
+ }
+
+ /**
+ * Sends an advisory message if a non-persistent message is sent and there
+ * are no active consumers
+ */
+ public void setSendAdvisoryIfNoConsumers(boolean
sendAdvisoryIfNoConsumers) {
+ this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+ }
}