Author: jstrachan
Date: Mon Feb 23 12:58:44 2009
New Revision: 747001
URL: http://svn.apache.org/viewvc?rev=747001&view=rev
Log:
Made topics also auto-expose themselves in Camel's context
Modified:
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
Modified:
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java?rev=747001&r1=747000&r2=747001&view=diff
==============================================================================
---
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
(original)
+++
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
Mon Feb 23 12:58:44 2009
@@ -28,9 +28,11 @@
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
+import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.jms.JmsQueueEndpoint;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
@@ -83,6 +85,15 @@
removeQueue(queue);
}
}
+ else if (destination instanceof ActiveMQTopic) {
+ ActiveMQTopic topic = (ActiveMQTopic) destination;
+ if (event.isAddOperation()) {
+ addTopic(topic);
+ }
+ else {
+ removeTopic(topic);
+ }
+ }
}
catch (Exception e) {
LOG.warn("Caught: " + e, e);
@@ -94,6 +105,11 @@
for (ActiveMQQueue queue : queues) {
addQueue(queue);
}
+
+ Set<ActiveMQTopic> topics = source.getTopics();
+ for (ActiveMQTopic topic : topics) {
+ addTopic(topic);
+ }
}
public void destroy() throws Exception {
@@ -159,4 +175,20 @@
String queueUri = getQueueUri(queue);
camelContext.removeEndpoints(queueUri);
}
+
+ protected void addTopic(ActiveMQTopic topic) throws Exception {
+ String topicUri = getTopicUri(topic);
+ ActiveMQComponent jmsComponent = getComponent();
+ Endpoint endpoint = new JmsEndpoint(topicUri, jmsComponent,
topic.getPhysicalName(), true, jmsComponent.getConfiguration());
+ camelContext.addEndpoint(topicUri, endpoint);
+ }
+
+ protected String getTopicUri(ActiveMQTopic topic) {
+ return "activemq:topic:" + topic.getPhysicalName();
+ }
+
+ protected void removeTopic(ActiveMQTopic topic) throws Exception {
+ String topicUri = getTopicUri(topic);
+ camelContext.removeEndpoints(topicUri);
+ }
}