Author: tabish
Date: Thu Dec 15 21:49:44 2011
New Revision: 1214964
URL: http://svn.apache.org/viewvc?rev=1214964&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3615
Reduce contention by not sending an advisory for every destination when not all
destination types are requested.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1214964&r1=1214963&r2=1214964&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Dec 15 21:49:44 2011
@@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory;
/**
* This broker filter handles tracking the state of the broker for purposes of
* publishing advisory messages to advisory consumers.
- *
- *
+ *
+ *
*/
public class AdvisoryBroker extends BrokerFilter {
@@ -54,9 +54,9 @@ public class AdvisoryBroker extends Brok
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo>
destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage>
networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
protected final ProducerId advisoryProducerId = new ProducerId();
-
+
private final LongSequenceGenerator messageIdGenerator = new
LongSequenceGenerator();
-
+
public AdvisoryBroker(Broker next) {
super(next);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
@@ -78,7 +78,7 @@ public class AdvisoryBroker extends Brok
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo
info) throws Exception {
Subscription answer = super.addConsumer(context, info);
-
+
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic =
AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
@@ -96,15 +96,23 @@ public class AdvisoryBroker extends Brok
}
}
- // We need to replay all the previously collected destination
- // objects
- // for this newly added consumer.
- if
(AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
- // Replay the destinations.
- for (Iterator<DestinationInfo> iter =
destinations.values().iterator(); iter.hasNext();) {
- DestinationInfo value = iter.next();
- ActiveMQTopic topic =
AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
- fireAdvisory(context, topic, value, info.getConsumerId());
+ // We check here whether the Destination is Temporary Destination
specific or not since we
+ // can avoid sending advisory messages to the consumer if it only
wants Temporary Destination
+ // notifications. If its not just temporary destination related
destinations then we have
+ // to send them all, a composite destination could want both.
+ if
(AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
+ // Replay the temporary destinations.
+ for (DestinationInfo destination : destinations.values()) {
+ if (destination.getDestination().isTemporary()) {
+ ActiveMQTopic topic =
AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
+ fireAdvisory(context, topic, destination,
info.getConsumerId());
+ }
+ }
+ } else if
(AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
+ // Replay all the destinations.
+ for (DestinationInfo destination : destinations.values()) {
+ ActiveMQTopic topic =
AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
+ fireAdvisory(context, topic, destination,
info.getConsumerId());
}
}
@@ -191,7 +199,7 @@ public class AdvisoryBroker extends Brok
fireAdvisory(context, topic, info);
try {
next.removeDestination(context,
AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
- } catch (Exception expectedIfDestinationDidNotExistYet) {
+ } catch (Exception expectedIfDestinationDidNotExistYet) {
}
try {
next.removeDestination(context,
AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
@@ -203,7 +211,7 @@ public class AdvisoryBroker extends Brok
@Override
public void removeDestinationInfo(ConnectionContext context,
DestinationInfo destInfo) throws Exception {
- super.removeDestinationInfo(context, destInfo);
+ super.removeDestinationInfo(context, destInfo);
DestinationInfo info = destinations.remove(destInfo.getDestination());
if (info != null) {
// ensure we don't modify (and loose/overwrite) an in-flight add
advisory, so duplicate
@@ -243,7 +251,7 @@ public class AdvisoryBroker extends Brok
ActiveMQTopic topic =
AdvisorySupport.getConsumerAdvisoryTopic(dest);
consumers.remove(info.getConsumerId());
if (!dest.isTemporary() || destinations.containsKey(dest)) {
- fireConsumerAdvisory(context,dest, topic,
info.createRemoveCommand());
+ fireConsumerAdvisory(context,dest, topic,
info.createRemoveCommand());
}
}
}
@@ -279,7 +287,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("expired", e);
}
}
-
+
@Override
public void messageConsumed(ConnectionContext context, MessageReference
messageReference) {
super.messageConsumed(context, messageReference);
@@ -294,7 +302,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("consumed", e);
}
}
-
+
@Override
public void messageDelivered(ConnectionContext context, MessageReference
messageReference) {
super.messageDelivered(context, messageReference);
@@ -309,7 +317,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("delivered", e);
}
}
-
+
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) {
super.messageDiscarded(context, sub, messageReference);
@@ -329,7 +337,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("discarded", e);
}
}
-
+
@Override
public void slowConsumer(ConnectionContext context, Destination
destination,Subscription subs) {
super.slowConsumer(context, destination,subs);
@@ -342,7 +350,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("slow consumer", e);
}
}
-
+
@Override
public void fastProducer(ConnectionContext context,ProducerInfo
producerInfo) {
super.fastProducer(context, producerInfo);
@@ -355,7 +363,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("fast producer", e);
}
}
-
+
@Override
public void isFull(ConnectionContext context, Destination destination,
Usage usage) {
super.isFull(context, destination, usage);
@@ -372,13 +380,13 @@ public class AdvisoryBroker extends Brok
}
}
}
-
+
@Override
- public void nowMasterBroker() {
+ public void nowMasterBroker() {
super.nowMasterBroker();
try {
ActiveMQTopic topic =
AdvisorySupport.getMasterBrokerAdvisoryTopic();
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(getBrokerService().getBroker());
@@ -387,7 +395,7 @@ public class AdvisoryBroker extends Brok
handleFireFailure("now master broker", e);
}
}
-
+
@Override
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference,
Subscription subscription){
@@ -401,7 +409,7 @@ public class AdvisoryBroker extends Brok
}
} catch (Exception e) {
handleFireFailure("add to DLQ", e);
- }
+ }
}
@Override
@@ -476,7 +484,7 @@ public class AdvisoryBroker extends Brok
}
}
advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT,
count);
-
+
fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
}
@@ -505,13 +513,13 @@ public class AdvisoryBroker extends Brok
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME,
getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() :
"NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,
id);
-
+
String url = getBrokerService().getVmConnectorURI().toString();
if (getBrokerService().getDefaultSocketURIString() != null) {
url = getBrokerService().getDefaultSocketURIString();
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL,
url);
-
+
//set the data structure
advisoryMessage.setDataStructure(command);
advisoryMessage.setPersistent(false);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1214964&r1=1214963&r2=1214964&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Dec 15 21:49:44 2011
@@ -59,7 +59,10 @@ public final class AdvisorySupport {
public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
-
+
+ public static final ActiveMQTopic
ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
+ TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," +
QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
+ TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
public static final ActiveMQTopic
TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new
ActiveMQTopic(AGENT_TOPIC);
@@ -187,7 +190,7 @@ public final class AdvisorySupport {
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
-
+
public static ActiveMQTopic
getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
String name = MESSAGE_DLQ_TOPIC_PREFIX +
destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
@@ -239,6 +242,20 @@ public final class AdvisorySupport {
return
isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
}
+ public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination
destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations =
destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (!isTempDestinationAdvisoryTopic(compositeDestinations[i]))
{
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) ||
destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
+ }
+ }
+
public static boolean isDestinationAdvisoryTopic(ActiveMQDestination
destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations =
destination.getCompositeDestinations();