Author: tabish
Date: Thu Jun 14 23:27:25 2012
New Revision: 1350425

URL: http://svn.apache.org/viewvc?rev=1350425&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3882

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1350425&r1=1350424&r2=1350425&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 Thu Jun 14 23:27:25 2012
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
@@ -28,7 +29,20 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -336,10 +350,12 @@ public class AdvisoryBroker extends Brok
     public void slowConsumer(ConnectionContext context, Destination 
destination,Subscription subs) {
         super.slowConsumer(context, destination,subs);
         try {
-            ActiveMQTopic topic = 
AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
-            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-            
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, 
subs.getConsumerInfo().getConsumerId().toString());
-            fireAdvisory(context, topic, subs.getConsumerInfo(), null, 
advisoryMessage);
+            if 
(!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
+                ActiveMQTopic topic = 
AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, 
subs.getConsumerInfo().getConsumerId().toString());
+                fireAdvisory(context, topic, subs.getConsumerInfo(), null, 
advisoryMessage);
+            }
         } catch (Exception e) {
             handleFireFailure("slow consumer", e);
         }
@@ -349,10 +365,12 @@ public class AdvisoryBroker extends Brok
     public void fastProducer(ConnectionContext context,ProducerInfo 
producerInfo) {
         super.fastProducer(context, producerInfo);
         try {
-            ActiveMQTopic topic = 
AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
-            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-            
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, 
producerInfo.getProducerId().toString());
-            fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
+            if 
(!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
+                ActiveMQTopic topic = 
AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, 
producerInfo.getProducerId().toString());
+                fireAdvisory(context, topic, producerInfo, null, 
advisoryMessage);
+            }
         } catch (Exception e) {
             handleFireFailure("fast producer", e);
         }


Reply via email to