Repository: activemq
Updated Branches:
  refs/heads/master c29d7477d -> d8c939a4b


no jira - exercise camel as a jms bridge with transacted consume and parallel 
send


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8c939a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8c939a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8c939a4

Branch: refs/heads/master
Commit: d8c939a4bde1496ae0629200551f754eaad38cff
Parents: c29d747
Author: gtully <[email protected]>
Authored: Thu Jul 5 12:16:24 2018 +0100
Committer: gtully <[email protected]>
Committed: Thu Jul 5 12:16:24 2018 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/camel/JmsBridge.java    | 140 +++++++++++++++++++
 .../org/apache/activemq/camel/jmsBridge.xml     |  85 +++++++++++
 2 files changed, 225 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d8c939a4/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java 
b/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java
new file mode 100644
index 0000000..3a1af51
--- /dev/null
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/JmsBridge.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class JmsBridge extends CamelSpringTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsBridge.class);
+
+    BrokerService brokerSub = null;
+    BrokerService brokerPub = null;
+
+    int messageCount;
+    final int backLog = 50;
+    final int errorLimit = 10;
+    AtomicInteger sendCount = new AtomicInteger();
+    AtomicInteger connectionCount = new AtomicInteger();
+
+
+    @Test
+    public void testBridgeWorks() throws Exception {
+        sendJMSMessageToKickOffRoute();
+
+        consumeMessages();
+
+        LOG.info("ConnectionCount: " + connectionCount.get());
+        assertEquals("x connections", 5 + errorLimit, connectionCount.get());
+    }
+
+    private void consumeMessages() throws Exception {
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://sub");
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new 
ActiveMQQueue("to"));
+
+        int messagesToConsume = messageCount;
+        while (messagesToConsume > 0) {
+            Message message = consumer.receive(5000);
+            if (message != null) {
+                messagesToConsume--;
+            }
+        }
+    }
+
+    private void sendJMSMessageToKickOffRoute() throws Exception {
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://pub");
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new 
ActiveMQQueue("from"));
+
+        for (int i = 0; i < backLog; i++) {
+            TextMessage message = session.createTextMessage("Some Text, 
messageCount:" + messageCount++);
+            message.setIntProperty("seq", messageCount);
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+    private BrokerService createBroker(String name, int port, boolean 
deleteAllMessages) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        brokerService.setBrokerName(name);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectory("target/data");
+        if (port > 0) {
+            brokerService.addConnector("tcp://0.0.0.0:" + port);
+        }
+        return brokerService;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+
+        try {
+            brokerSub = createBroker("sub", 61617, true);
+            brokerSub.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
+                @Override
+                public void send(ProducerBrokerExchange producerExchange, 
org.apache.activemq.command.Message messageSend) throws Exception {
+                    if (sendCount.incrementAndGet() <= errorLimit) {
+                        throw new RuntimeException("You need to try send " + 
errorLimit + " times!");
+                    }
+                    super.send(producerExchange, messageSend);
+                }
+
+                @Override
+                public void addConnection(ConnectionContext context, 
ConnectionInfo info) throws Exception {
+                    if (((TransportConnector) 
context.getConnector()).getConnectUri().getScheme().equals("tcp") && 
connectionCount.incrementAndGet() <= errorLimit) {
+                        throw new SecurityException("You need to try connect " 
+ errorLimit + " times!");
+                    }
+                    super.addConnection(context, info);
+                }
+            }});
+            brokerSub.start();
+
+            brokerPub = createBroker("pub", 61616, true);
+            brokerPub.start();
+
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to start broker", e);
+        }
+
+        return new 
ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsBridge.xml");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8c939a4/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml 
b/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml
new file mode 100644
index 0000000..c5951ee
--- /dev/null
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsBridge.xml
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:context="http://www.springframework.org/schema/context";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+       http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context-3.0.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <context:annotation-config/>
+
+    <context:property-placeholder properties-ref="inlineSharedValues"/>
+
+    <bean id="inlineSharedValues"
+          
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
+        <property name="properties">
+            <props>
+                <!-- we want to match the producer connection pool with the 
concurrentConsumers.
+                     Increase this to get concurrent parallel transactions -->
+                <prop key="concurrentSends">5</prop>
+            </props>
+        </property>
+    </bean>
+
+    <!-- from broker -->
+    <bean id="cfPub" class="org.apache.activemq.ActiveMQConnectionFactory">
+        <!-- we don't want mesages to go to the DLQ, hense infinite 
redeliveries -->
+        <property name="brokerURL"
+                  
value="failover:(tcp://localhost:61616)?jms.redeliveryPolicy.maximumRedeliveries=-1"/>
+    </bean>
+
+    <bean id="pooledCfPub" 
class="org.apache.activemq.pool.PooledConnectionFactory">
+        <property name="maxConnections" value="1"/>
+        <property name="connectionFactory" ref="cfPub"/>
+    </bean>
+
+    <bean id="activemq-pub" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="connectionFactory" ref="pooledCfPub"/>
+        <property name="transacted" value="true"/>
+        <property name="concurrentConsumers" value="${concurrentSends}"/>
+        <property name="cacheLevelName" value="CACHE_CONSUMER"/>
+    </bean>
+
+    <!-- to broker -->
+    <bean id="cfSub" class="org.apache.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="failover:(tcp://localhost:61617)"/>
+        <property name="disableTimeStampsByDefault" value="true"/>
+        <property name="copyMessageOnSend" value="false"/>
+    </bean>
+
+    <bean id="pooledCfSub" 
class="org.apache.activemq.pool.PooledConnectionFactory">
+        <property name="maxConnections" value="${concurrentSends}"/>
+        <property name="connectionFactory" ref="cfSub"/>
+    </bean>
+
+    <bean id="activemq-sub" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="connectionFactory" ref="pooledCfSub"/>
+        <property name="forceSendOriginalMessage" value="true"/>
+    </bean>
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring"; 
id="camel-bridge">
+        <route id="move-route">
+            <from uri="activemq-pub:queue:from"/>
+            <to uri="activemq-sub:queue:to"/>
+        </route>
+    </camelContext>
+
+</beans>

Reply via email to