Repository: activemq Updated Branches: refs/heads/master c29d7477d -> d8c939a4b
no jira - exercise camel as a jms bridge with transacted consume and parallel send Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8c939a4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8c939a4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8c939a4 Branch: refs/heads/master Commit: d8c939a4bde1496ae0629200551f754eaad38cff Parents: c29d747 Author: gtully <[email protected]> Authored: Thu Jul 5 12:16:24 2018 +0100 Committer: gtully <[email protected]> Committed: Thu Jul 5 12:16:24 2018 +0100 ---------------------------------------------------------------------- .../org/apache/activemq/camel/JmsBridge.java | 140 +++++++++++++++++++ .../org/apache/activemq/camel/jmsBridge.xml | 85 +++++++++++ 2 files changed, 225 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/d8c939a4/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java b/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java new file mode 100644 index 0000000..3a1af51 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.*; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import javax.jms.*; +import javax.jms.Connection; +import java.util.concurrent.atomic.AtomicInteger; + +public class JmsBridge extends CamelSpringTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsBridge.class); + + BrokerService brokerSub = null; + BrokerService brokerPub = null; + + int messageCount; + final int backLog = 50; + final int errorLimit = 10; + AtomicInteger sendCount = new AtomicInteger(); + AtomicInteger connectionCount = new AtomicInteger(); + + + @Test + public void testBridgeWorks() throws Exception { + sendJMSMessageToKickOffRoute(); + + consumeMessages(); + + LOG.info("ConnectionCount: " + connectionCount.get()); + assertEquals("x connections", 5 + errorLimit, connectionCount.get()); + } + + private void consumeMessages() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://sub"); + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("to")); + + int messagesToConsume = messageCount; + while (messagesToConsume > 0) { + Message message = consumer.receive(5000); + if (message != null) { + messagesToConsume--; + } + } + } + + private void sendJMSMessageToKickOffRoute() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://pub"); + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue("from")); + + for (int i = 0; i < backLog; i++) { + TextMessage message = session.createTextMessage("Some Text, messageCount:" + messageCount++); + message.setIntProperty("seq", messageCount); + producer.send(message); + } + connection.close(); + } + + private BrokerService createBroker(String name, int port, boolean deleteAllMessages) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); + brokerService.setBrokerName(name); + brokerService.setAdvisorySupport(false); + brokerService.setUseJmx(false); + brokerService.setDataDirectory("target/data"); + if (port > 0) { + brokerService.addConnector("tcp://0.0.0.0:" + port); + } + return brokerService; + } + + @SuppressWarnings("unchecked") + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + + try { + brokerSub = createBroker("sub", 61617, true); + brokerSub.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { + @Override + public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception { + if (sendCount.incrementAndGet() <= errorLimit) { + throw new RuntimeException("You need to try send " + errorLimit + " times!"); + } + super.send(producerExchange, messageSend); + } + + @Override + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + if (((TransportConnector) context.getConnector()).getConnectUri().getScheme().equals("tcp") && connectionCount.incrementAndGet() <= errorLimit) { + throw new SecurityException("You need to try connect " + errorLimit + " times!"); + } + super.addConnection(context, info); + } + }}); + brokerSub.start(); + + brokerPub = createBroker("pub", 61616, true); + brokerPub.start(); + + + } catch (Exception e) { + throw new RuntimeException("Failed to start broker", e); + } + + return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsBridge.xml"); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/d8c939a4/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml new file mode 100644 index 0000000..c5951ee --- /dev/null +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml @@ -0,0 +1,85 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:context="http://www.springframework.org/schema/context" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd + http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <context:annotation-config/> + + <context:property-placeholder properties-ref="inlineSharedValues"/> + + <bean id="inlineSharedValues" + class="org.springframework.beans.factory.config.PropertiesFactoryBean"> + <property name="properties"> + <props> + <!-- we want to match the producer connection pool with the concurrentConsumers. + Increase this to get concurrent parallel transactions --> + <prop key="concurrentSends">5</prop> + </props> + </property> + </bean> + + <!-- from broker --> + <bean id="cfPub" class="org.apache.activemq.ActiveMQConnectionFactory"> + <!-- we don't want mesages to go to the DLQ, hense infinite redeliveries --> + <property name="brokerURL" + value="failover:(tcp://localhost:61616)?jms.redeliveryPolicy.maximumRedeliveries=-1"/> + </bean> + + <bean id="pooledCfPub" class="org.apache.activemq.pool.PooledConnectionFactory"> + <property name="maxConnections" value="1"/> + <property name="connectionFactory" ref="cfPub"/> + </bean> + + <bean id="activemq-pub" class="org.apache.activemq.camel.component.ActiveMQComponent"> + <property name="connectionFactory" ref="pooledCfPub"/> + <property name="transacted" value="true"/> + <property name="concurrentConsumers" value="${concurrentSends}"/> + <property name="cacheLevelName" value="CACHE_CONSUMER"/> + </bean> + + <!-- to broker --> + <bean id="cfSub" class="org.apache.activemq.ActiveMQConnectionFactory"> + <property name="brokerURL" value="failover:(tcp://localhost:61617)"/> + <property name="disableTimeStampsByDefault" value="true"/> + <property name="copyMessageOnSend" value="false"/> + </bean> + + <bean id="pooledCfSub" class="org.apache.activemq.pool.PooledConnectionFactory"> + <property name="maxConnections" value="${concurrentSends}"/> + <property name="connectionFactory" ref="cfSub"/> + </bean> + + <bean id="activemq-sub" class="org.apache.activemq.camel.component.ActiveMQComponent"> + <property name="connectionFactory" ref="pooledCfSub"/> + <property name="forceSendOriginalMessage" value="true"/> + </bean> + + <camelContext xmlns="http://camel.apache.org/schema/spring" id="camel-bridge"> + <route id="move-route"> + <from uri="activemq-pub:queue:from"/> + <to uri="activemq-sub:queue:to"/> + </route> + </camelContext> + +</beans>
