Author: tabish
Date: Thu Jun 14 23:27:25 2012
New Revision: 1350425
URL: http://svn.apache.org/viewvc?rev=1350425&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3882
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1350425&r1=1350424&r2=1350425&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Jun 14 23:27:25 2012
@@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@@ -28,7 +29,20 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -336,10 +350,12 @@ public class AdvisoryBroker extends Brok
public void slowConsumer(ConnectionContext context, Destination
destination,Subscription subs) {
super.slowConsumer(context, destination,subs);
try {
- ActiveMQTopic topic =
AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID,
subs.getConsumerInfo().getConsumerId().toString());
- fireAdvisory(context, topic, subs.getConsumerInfo(), null,
advisoryMessage);
+ if
(!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
+ ActiveMQTopic topic =
AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID,
subs.getConsumerInfo().getConsumerId().toString());
+ fireAdvisory(context, topic, subs.getConsumerInfo(), null,
advisoryMessage);
+ }
} catch (Exception e) {
handleFireFailure("slow consumer", e);
}
@@ -349,10 +365,12 @@ public class AdvisoryBroker extends Brok
public void fastProducer(ConnectionContext context,ProducerInfo
producerInfo) {
super.fastProducer(context, producerInfo);
try {
- ActiveMQTopic topic =
AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID,
producerInfo.getProducerId().toString());
- fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
+ if
(!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
+ ActiveMQTopic topic =
AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID,
producerInfo.getProducerId().toString());
+ fireAdvisory(context, topic, producerInfo, null,
advisoryMessage);
+ }
} catch (Exception e) {
handleFireFailure("fast producer", e);
}