Author: chirino
Date: Sat Dec 31 08:50:52 2005
New Revision: 360329

URL: http://svn.apache.org/viewcvs?rev=360329&view=rev
Log:
imporoved the advisory message being sent by topics when no consumer is 
listening:
 - It sends it non transactional
 - It does does not block due to flow control

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=360329&r1=360328&r2=360329&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Sat Dec 31 08:50:52 2005
@@ -152,6 +152,8 @@
             
         } else if( ack.isPoisonAck() ) {
             
+            // TODO: what if the message is already in a DLQ???
+            
             // Handle the poison ACK case: we need to send the message to a 
DLQ  
             if( ack.isInTransaction() )
                 throw new JMSException("Poison ack cannot be transacted: 
"+ack);
@@ -175,20 +177,16 @@
                         Message message = node.getMessage();
                         if( message !=null ) {
                             
-                            // TODO is this meant to be == null?
+                            // The original destination and transaction id do 
not get filled when the message is first sent,
+                            // it is only populated if the message is routed 
to another destination like the DLQ
                             if( message.getOriginalDestination()!=null )
                                 
message.setOriginalDestination(message.getDestination());
+                            if( message.getOriginalTransactionId()!=null )
+                                
message.setOriginalTransactionId(message.getTransactionId());
                             
-                            ActiveMQDestination originalDestination = 
message.getOriginalDestination();
-                            if (originalDestination == null) {
-                                originalDestination = message.getDestination();
-                            }
                             DeadLetterStrategy deadLetterStrategy = 
node.getRegionDestination().getDeadLetterStrategy();
-                            ActiveMQDestination deadLetterDestination = 
deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
+                            ActiveMQDestination deadLetterDestination = 
deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
                             message.setDestination(deadLetterDestination);
-                            
-                            if( message.getOriginalTransactionId()!=null )
-                                
message.setOriginalTransactionId(message.getTransactionId());
                             message.setTransactionId(null);
                             message.evictMarshlledForm();
 

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=360329&r1=360328&r2=360329&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Sat Dec 31 08:50:52 2005
@@ -338,11 +338,29 @@
             if (sendAdvisoryIfNoConsumers) {
                 // allow messages with no consumers to be dispatched to a dead
                 // letter queue
-                ActiveMQDestination originalDestination = 
message.getDestination();
-                if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
-                    ActiveMQTopic advisoryTopic = 
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(originalDestination);
+                if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+                    
+                    // The original destination and transaction id do not get 
filled when the message is first sent,
+                    // it is only populated if the message is routed to 
another destination like the DLQ
+                    if( message.getOriginalDestination()!=null )
+                        
message.setOriginalDestination(message.getDestination());
+                    if( message.getOriginalTransactionId()!=null )
+                        
message.setOriginalTransactionId(message.getTransactionId());
+
+                    ActiveMQTopic advisoryTopic = 
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
                     message.setDestination(advisoryTopic);
-                    context.getBroker().send(context, message);
+                    message.setTransactionId(null);
+                    message.evictMarshlledForm();
+
+                    // Disable flow control for this since since we don't want 
to block.
+                    boolean originalFlowControl = 
context.isProducerFlowControl();
+                    try {
+                        context.setProducerFlowControl(false);
+                        context.getBroker().send(context, message);
+                    } finally {
+                        context.setProducerFlowControl(originalFlowControl);
+                    }
+                    
                 }
             }
         }


Reply via email to