Author: dejanb
Date: Tue Feb 17 12:56:24 2009
New Revision: 745031
URL: http://svn.apache.org/viewvc?rev=745031&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2120
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Tue Feb 17 12:56:24 2009
@@ -17,12 +17,18 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -57,6 +63,7 @@
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
+ private boolean sendAdvisoryIfNoConsumers;
protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
protected final BrokerService brokerService;
protected final Broker regionBroker;
@@ -323,6 +330,14 @@
public void setAdvisdoryForFastProducers(boolean
advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
+
+ public boolean isSendAdvisoryIfNoConsumers() {
+ return sendAdvisoryIfNoConsumers;
+ }
+
+ public void setSendAdvisoryIfNoConsumers(boolean
sendAdvisoryIfNoConsumers) {
+ this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+ }
/**
* @return the dead letter strategy
@@ -420,4 +435,54 @@
this.destinationStatistics.setParent(null);
this.memoryUsage.stop();
}
+
+ /**
+ * Provides a hook to allow messages with no consumer to be processed in
+ * some way - such as to send to a dead letter queue or something..
+ */
+ protected void onMessageWithNoConsumers(ConnectionContext context, Message
message) throws Exception {
+ if (!message.isPersistent()) {
+ if (isSendAdvisoryIfNoConsumers()) {
+ // allow messages with no consumers to be dispatched to a dead
+ // letter queue
+ if (destination.isQueue() ||
!AdvisorySupport.isAdvisoryTopic(destination)) {
+
+ // The original destination and transaction id do not get
+ // filled when the message is first sent,
+ // it is only populated if the message is routed to another
+ // destination like the DLQ
+ if (message.getOriginalDestination() != null) {
+
message.setOriginalDestination(message.getDestination());
+ }
+ if (message.getOriginalTransactionId() != null) {
+
message.setOriginalTransactionId(message.getTransactionId());
+ }
+
+ ActiveMQTopic advisoryTopic;
+ if (destination.isQueue()) {
+ advisoryTopic =
AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
+ } else {
+ advisoryTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
+ }
+ message.setDestination(advisoryTopic);
+ message.setTransactionId(null);
+
+ // Disable flow control for this since since we don't want
+ // to block.
+ boolean originalFlowControl =
context.isProducerFlowControl();
+ try {
+ context.setProducerFlowControl(false);
+ ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
+ producerExchange.setMutable(false);
+ producerExchange.setConnectionContext(context);
+ producerExchange.setProducerState(new
ProducerState(new ProducerInfo()));
+ context.getBroker().send(producerExchange, message);
+ } finally {
+ context.setProducerFlowControl(originalFlowControl);
+ }
+
+ }
+ }
+ }
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb 17 12:56:24 2009
@@ -37,6 +37,7 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -50,6 +51,7 @@
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
@@ -62,6 +64,7 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
@@ -1210,6 +1213,11 @@
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
+ synchronized (consumers) {
+ if (consumers.isEmpty()) {
+ onMessageWithNoConsumers(context, msg);
+ }
+ }
wakeup();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Feb 17 12:56:24 2009
@@ -70,7 +70,6 @@
protected final Valve dispatchValve = new Valve(true);
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
- private boolean sendAdvisoryIfNoConsumers;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>
durableSubcribers = new ConcurrentHashMap<SubscriptionKey,
DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new
LinkedList<Runnable>();
@@ -541,14 +540,6 @@
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
}
- public boolean isSendAdvisoryIfNoConsumers() {
- return sendAdvisoryIfNoConsumers;
- }
-
- public void setSendAdvisoryIfNoConsumers(boolean
sendAdvisoryIfNoConsumers) {
- this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
- }
-
// Implementation methods
//
-------------------------------------------------------------------------
@@ -601,48 +592,5 @@
}
}
- /**
- * Provides a hook to allow messages with no consumer to be processed in
- * some way - such as to send to a dead letter queue or something..
- */
- protected void onMessageWithNoConsumers(ConnectionContext context, Message
message) throws Exception {
- if (!message.isPersistent()) {
- if (sendAdvisoryIfNoConsumers) {
- // allow messages with no consumers to be dispatched to a dead
- // letter queue
- if (!AdvisorySupport.isAdvisoryTopic(destination)) {
-
- // The original destination and transaction id do not get
- // filled when the message is first sent,
- // it is only populated if the message is routed to another
- // destination like the DLQ
- if (message.getOriginalDestination() != null) {
-
message.setOriginalDestination(message.getDestination());
- }
- if (message.getOriginalTransactionId() != null) {
-
message.setOriginalTransactionId(message.getTransactionId());
- }
-
- ActiveMQTopic advisoryTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
- message.setDestination(advisoryTopic);
- message.setTransactionId(null);
-
- // Disable flow control for this since since we don't want
- // to block.
- boolean originalFlowControl =
context.isProducerFlowControl();
- try {
- context.setProducerFlowControl(false);
- ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
- producerExchange.setMutable(false);
- producerExchange.setConnectionContext(context);
- producerExchange.setProducerState(new
ProducerState(new ProducerInfo()));
- context.getBroker().send(producerExchange, message);
- } finally {
- context.setProducerFlowControl(originalFlowControl);
- }
- }
- }
- }
- }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Tue Feb 17 12:56:24 2009
@@ -110,7 +110,6 @@
if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
- topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if (memoryLimit > 0) {
topic.getMemoryUsage().setLimit(memoryLimit);
}
@@ -132,6 +131,7 @@
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
+ destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java?rev=745031&r1=745030&r2=745031&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Tue Feb 17 12:56:24 2009
@@ -37,9 +37,6 @@
public void testDurableTopicMessage() throws Exception {
}
- public void testTransientQueueMessage() throws Exception {
- }
-
protected void doTest() throws Exception {
makeDlqConsumer();
sendMessages();
@@ -65,7 +62,11 @@
}
protected Destination createDlqDestination() {
- return
AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+ if (this.topic) {
+ return
AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+ } else {
+ return
AdvisorySupport.getNoQueueConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+ }
}
}