...
A subscription to each of the destination returns an ActiveMQMessage. Specific DataStructure objects (ie. ConsumerInfo, ProducerInfo,ConnectionInfo) can be retrieve via getDataStructure method of ActiveMQMessage.
For example:
Code Block |
...
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
....
public void onMessage(Message msg){
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}
|
...
Note that the consumer start/stop advisory messages also have a consumerCount header to indicate the number of active consumers on the destination when the advisory message was sent. This means you can use the following selector to be notified when there are no active consumers on a given destination...
Code Block |
consumerCount = 0
|
Destination and Message based advisories
...
The advisories that are not turned on by default (see the last column) can be enabled on a PolicyEntry in the ActiveMQ Broker Configuration - e.g. - to enable a message consumed advisory you can configure the following:
Code Block |
<destinationPolicy>
<policyMap><policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" />
</policyEntries></policyMap>
</destinationPolicy>
|
...
Advisories need to be disabled both on the Broker, via XML Configuration
Code Block |
<broker advisorySupport="false">...
|
or from java code
Code Block |
BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
...
broker.start();
|
...
or via java code using the 'watchTopicAdvisories' attribute on the ActiveMQConnectionFactory.
Code Block |
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setWatchTopicAdvisories(false);
|
...
Methods to get the advisory destination objects are available in AdvisorySupport through the following methods.
Code Block |
AdvisorySupport.getConsumerAdvisoryTopic()
AdvisorySupport.getProducerAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getDestinationAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
//Version 5.2 onwards
AdvisorySupport.getSlowConsumerAdvisoryTopic()
AdvisorySupport.getFastProducerAdvisoryTopic()
AdvisorySupport.getMessageDiscardedAdvisoryTopic()
AdvisorySupport.getMessageDeliveredAdvisoryTopic()
AdvisorySupport.getMessageConsumedAdvisoryTopic()
AdvisorySupport.getMasterBrokerAdvisoryTopic()
AdvisorySupport.getFullAdvisoryTopic()
|
Some helper classes to deal with advisory messages are available in the advisories package.
For users of previous releases see the Advisory Support in ActiveMQ 3