Repository: activemq Updated Branches: refs/heads/trunk b2e6a4166 -> 7ca25965d
added CamelRoutesBrokerPlugin for https://issues.apache.org/jira/browse/AMQ-5351 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7ca25965 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7ca25965 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7ca25965 Branch: refs/heads/trunk Commit: 7ca25965db5b9ffdfba11b914dafbb36a3504162 Parents: b2e6a41 Author: Rob Davies <[email protected]> Authored: Tue Sep 9 16:52:21 2014 +0100 Committer: Rob Davies <[email protected]> Committed: Tue Sep 9 16:53:28 2014 +0100 ---------------------------------------------------------------------- .../camel/camelplugin/CamelRoutesBroker.java | 294 +++++++++++++++++++ .../camelplugin/CamelRoutesBrokerPlugin.java | 67 +++++ .../camelplugin/CamelPluginConfigTest.java | 127 ++++++++ .../camel/camelplugin/camel-routes-activemq.xml | 29 ++ .../activemq/camel/camelplugin/routes.xml | 22 ++ activemq-spring/pom.xml | 1 + 6 files changed, 540 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java new file mode 100644 index 0000000..4d5bbb2 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.camel.camelplugin; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.spring.Utils; +import org.apache.activemq.usage.Usage; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.RoutesDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; + +import java.io.File; +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * A StatisticsBroker You can retrieve a Map Message for a Destination - or + * Broker containing statistics as key-value pairs The message must contain a + * replyTo Destination - else its ignored + * + */ +public class CamelRoutesBroker extends BrokerFilter { + private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBroker.class); + private String routesFile = ""; + private int checkPeriod = 1000; + private Resource theRoutes; + private DefaultCamelContext camelContext; + private long lastRoutesModified = -1; + private CountDownLatch countDownLatch; + + /** + * Overide methods to pause the broker whilst camel routes are loaded + */ + @Override + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { + blockWhileLoadingCamelRoutes(); + super.send(producerExchange, message); + } + + @Override + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { + blockWhileLoadingCamelRoutes(); + super.acknowledge(consumerExchange, ack); + } + + @Override + public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { + blockWhileLoadingCamelRoutes(); + return super.messagePull(context, pull); + } + + @Override + public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { + blockWhileLoadingCamelRoutes(); + super.processConsumerControl(consumerExchange, control); + } + + @Override + public void reapplyInterceptor() { + blockWhileLoadingCamelRoutes(); + super.reapplyInterceptor(); + } + + @Override + public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { + blockWhileLoadingCamelRoutes(); + super.beginTransaction(context, xid); + } + + @Override + public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { + blockWhileLoadingCamelRoutes(); + return super.prepareTransaction(context, xid); + } + + @Override + public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { + blockWhileLoadingCamelRoutes(); + super.rollbackTransaction(context, xid); + } + + @Override + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { + blockWhileLoadingCamelRoutes(); + super.commitTransaction(context, xid, onePhase); + } + + @Override + public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { + blockWhileLoadingCamelRoutes(); + super.forgetTransaction(context, transactionId); + } + + @Override + public void preProcessDispatch(MessageDispatch messageDispatch) { + blockWhileLoadingCamelRoutes(); + super.preProcessDispatch(messageDispatch); + } + + @Override + public void postProcessDispatch(MessageDispatch messageDispatch) { + blockWhileLoadingCamelRoutes(); + super.postProcessDispatch(messageDispatch); + } + + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) { + blockWhileLoadingCamelRoutes(); + return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); + } + + @Override + public void messageConsumed(ConnectionContext context, MessageReference messageReference) { + blockWhileLoadingCamelRoutes(); + super.messageConsumed(context, messageReference); + } + + @Override + public void messageDelivered(ConnectionContext context, MessageReference messageReference) { + blockWhileLoadingCamelRoutes(); + super.messageDelivered(context, messageReference); + } + + @Override + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { + blockWhileLoadingCamelRoutes(); + super.messageDiscarded(context, sub, messageReference); + } + + @Override + public void isFull(ConnectionContext context, Destination destination, Usage usage) { + blockWhileLoadingCamelRoutes(); + super.isFull(context, destination, usage); + } + + @Override + public void nowMasterBroker() { + blockWhileLoadingCamelRoutes(); + super.nowMasterBroker(); + } + + /* + * Properties + */ + + public String getRoutesFile() { + return routesFile; + } + + public void setRoutesFile(String routesFile) { + this.routesFile = routesFile; + } + + public int getCheckPeriod() { + return checkPeriod; + } + + public void setCheckPeriod(int checkPeriod) { + this.checkPeriod = checkPeriod; + } + + public CamelRoutesBroker(Broker next) { + super(next); + } + + @Override + public void start() throws Exception { + super.start(); + LOG.info("Starting CamelRoutesBroker"); + + camelContext = new DefaultCamelContext(); + camelContext.setName("EmbeddedCamel-" + getBrokerName()); + camelContext.start(); + + getBrokerService().getScheduler().executePeriodically(new Runnable() { + @Override + public void run() { + try { + loadCamelRoutes(); + } catch (Throwable e) { + LOG.error("Failed to load Camel Routes", e); + } + + } + }, getCheckPeriod()); + } + + + + @Override + public void stop() throws Exception { + CountDownLatch latch = this.countDownLatch; + if (latch != null){ + latch.countDown(); + } + if (camelContext != null){ + camelContext.stop(); + } + super.stop(); + } + + private void loadCamelRoutes() throws Exception{ + if (theRoutes == null) { + String fileToUse = getRoutesFile(); + if (fileToUse == null || fileToUse.trim().isEmpty()) { + BrokerContext brokerContext = getBrokerService().getBrokerContext(); + if (brokerContext != null) { + String uri = brokerContext.getConfigurationUrl(); + Resource resource = Utils.resourceFromString(uri); + if (resource.exists()) { + fileToUse = resource.getFile().getParent(); + fileToUse += File.separator; + fileToUse += "routes.xml"; + } + } + } + if (fileToUse != null && !fileToUse.isEmpty()){ + theRoutes = Utils.resourceFromString(fileToUse); + setRoutesFile(theRoutes.getFile().getAbsolutePath()); + } + } + if (!isStopped() && camelContext != null && theRoutes != null && theRoutes.exists()){ + long lastModified = theRoutes.lastModified(); + if (lastModified != lastRoutesModified){ + CountDownLatch latch = new CountDownLatch(1); + this.countDownLatch = latch; + lastRoutesModified = lastModified; + + List<RouteDefinition> currentRoutes = camelContext.getRouteDefinitions(); + for (RouteDefinition rd:currentRoutes){ + camelContext.stopRoute(rd); + camelContext.removeRouteDefinition(rd); + } + InputStream is = theRoutes.getInputStream(); + RoutesDefinition routesDefinition = camelContext.loadRoutesDefinition(is); + + for (RouteDefinition rd: routesDefinition.getRoutes()){ + camelContext.startRoute(rd); + } + is.close(); + latch.countDown(); + this.countDownLatch=null; + } + + + } + } + + private void blockWhileLoadingCamelRoutes(){ + CountDownLatch latch = this.countDownLatch; + if (latch != null){ + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java new file mode 100644 index 0000000..2e9833f --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.camel.camelplugin; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A CamelRoutesBrokerPlugin + * + * load camel routes dynamically from a routes.xml file located in same directory as ActiveMQ.xml + * + * @org.apache.xbean.XBean element="camelRoutesBrokerPlugin" + * + */ +public class CamelRoutesBrokerPlugin implements BrokerPlugin { + private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBrokerPlugin.class); + private String routesFile = ""; + private int checkPeriod =1000; + + public String getRoutesFile() { + return routesFile; + } + + public void setRoutesFile(String routesFile) { + this.routesFile = routesFile; + } + + public int getCheckPeriod() { + return checkPeriod; + } + + public void setCheckPeriod(int checkPeriod) { + this.checkPeriod = checkPeriod; + } + + /** + * @param broker + * @return the plug-in + * @throws Exception + * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker) + */ + public Broker installPlugin(Broker broker) throws Exception { + CamelRoutesBroker answer = new CamelRoutesBroker(broker); + answer.setCheckPeriod(getCheckPeriod()); + answer.setRoutesFile(getRoutesFile()); + LOG.info("Installing CamelRoutesBroker"); + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java new file mode 100644 index 0000000..5db3cc2 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.camel.camelplugin; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class CamelPluginConfigTest { + + protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/camelplugin/"; + protected static final String TOPIC_NAME = "test.topic"; + protected static final String QUEUE_NAME = "test.queue"; + + protected BrokerService brokerService; + protected ActiveMQConnectionFactory factory; + protected Connection producerConnection; + protected Connection consumerConnection; + protected Session consumerSession; + protected Session producerSession; + + protected int messageCount = 1000; + protected int timeOutInSeconds = 10; + + @Before + public void setUp() throws Exception { + brokerService = createBroker(new FileSystemResource(CONF_ROOT + "camel-routes-activemq.xml")); + + factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + consumerConnection = factory.createConnection(); + consumerConnection.start(); + producerConnection = factory.createConnection(); + producerConnection.start(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected BrokerService createBroker(String resource) throws Exception { + return createBroker(new ClassPathResource(resource)); + } + + protected BrokerService createBroker(Resource resource) throws Exception { + + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService broker = factory.createBroker(resource.getURI()); + return broker; + } + + @After + public void tearDown() throws Exception { + if (producerConnection != null) { + producerConnection.close(); + } + if (consumerConnection != null) { + consumerConnection.close(); + } + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testReRouteAll() throws Exception { + Thread.sleep(2000); + final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + + Topic topic = consumerSession.createTopic(TOPIC_NAME); + + final CountDownLatch latch = new CountDownLatch(messageCount); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + MessageProducer producer = producerSession.createProducer(topic); + + for (int i = 0; i < messageCount; i++) { + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml new file mode 100644 index 0000000..a6be02b --- /dev/null +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <broker xmlns="http://activemq.apache.org/schema/core" persistent="false"> + <plugins> + <camelRoutesBrokerPlugin checkPeriod="100" /> + </plugins> + </broker> +</beans> http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml new file mode 100644 index 0000000..d44ae06 --- /dev/null +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml @@ -0,0 +1,22 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<routes xmlns="http://camel.apache.org/schema/spring"> + <route id="test"> + <from uri="broker:topic:test.topic"/> + <to uri="broker:queue:test.queue"/> + </route> +</routes> http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-spring/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index afe1a69..6efa019 100755 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -238,6 +238,7 @@ <includes> <include>${basedir}/../activemq-client/src/main/java</include> <include>${basedir}/../activemq-broker/src/main/java</include> + <include>${basedir}/../activemq-camel/src/main/java</include> <include>${basedir}/../activemq-leveldb-store/src/main/java</include> <include>${basedir}/../activemq-jdbc-store/src/main/java</include> <include>${basedir}/../activemq-kahadb-store/src/main/java</include>
