Author: chirino
Date: Sat Dec 31 08:50:52 2005
New Revision: 360329
URL: http://svn.apache.org/viewcvs?rev=360329&view=rev
Log:
imporoved the advisory message being sent by topics when no consumer is
listening:
- It sends it non transactional
- It does does not block due to flow control
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=360329&r1=360328&r2=360329&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Sat Dec 31 08:50:52 2005
@@ -152,6 +152,8 @@
} else if( ack.isPoisonAck() ) {
+ // TODO: what if the message is already in a DLQ???
+
// Handle the poison ACK case: we need to send the message to a
DLQ
if( ack.isInTransaction() )
throw new JMSException("Poison ack cannot be transacted:
"+ack);
@@ -175,20 +177,16 @@
Message message = node.getMessage();
if( message !=null ) {
- // TODO is this meant to be == null?
+ // The original destination and transaction id do
not get filled when the message is first sent,
+ // it is only populated if the message is routed
to another destination like the DLQ
if( message.getOriginalDestination()!=null )
message.setOriginalDestination(message.getDestination());
+ if( message.getOriginalTransactionId()!=null )
+
message.setOriginalTransactionId(message.getTransactionId());
- ActiveMQDestination originalDestination =
message.getOriginalDestination();
- if (originalDestination == null) {
- originalDestination = message.getDestination();
- }
DeadLetterStrategy deadLetterStrategy =
node.getRegionDestination().getDeadLetterStrategy();
- ActiveMQDestination deadLetterDestination =
deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
+ ActiveMQDestination deadLetterDestination =
deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
message.setDestination(deadLetterDestination);
-
- if( message.getOriginalTransactionId()!=null )
-
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
message.evictMarshlledForm();
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=360329&r1=360328&r2=360329&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Sat Dec 31 08:50:52 2005
@@ -338,11 +338,29 @@
if (sendAdvisoryIfNoConsumers) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
- ActiveMQDestination originalDestination =
message.getDestination();
- if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
- ActiveMQTopic advisoryTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(originalDestination);
+ if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+
+ // The original destination and transaction id do not get
filled when the message is first sent,
+ // it is only populated if the message is routed to
another destination like the DLQ
+ if( message.getOriginalDestination()!=null )
+
message.setOriginalDestination(message.getDestination());
+ if( message.getOriginalTransactionId()!=null )
+
message.setOriginalTransactionId(message.getTransactionId());
+
+ ActiveMQTopic advisoryTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
message.setDestination(advisoryTopic);
- context.getBroker().send(context, message);
+ message.setTransactionId(null);
+ message.evictMarshlledForm();
+
+ // Disable flow control for this since since we don't want
to block.
+ boolean originalFlowControl =
context.isProducerFlowControl();
+ try {
+ context.setProducerFlowControl(false);
+ context.getBroker().send(context, message);
+ } finally {
+ context.setProducerFlowControl(originalFlowControl);
+ }
+
}
}
}