Author: gtully
Date: Fri Aug 26 11:04:14 2011
New Revision: 1162061
URL: http://svn.apache.org/viewvc?rev=1162061&view=rev
Log:
test to explore concurrent transacted consumption with camel and prefetch
implications of pooling
Added:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
(with props)
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
(with props)
Added:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java?rev=1162061&view=auto
==============================================================================
---
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
(added)
+++
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
Fri Aug 26 11:04:14 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.Wait;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class TransactedConsumeTest extends CamelSpringTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(TransactedConsumeTest.class);
+ BrokerService broker = null;
+ int messageCount = 1000;
+
+ @Test
+ public void testConsume() throws Exception {
+
+ LOG.info("Wait for dequeue message...");
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker.getAdminView().getTotalDequeueCount() >=
messageCount;
+ }
+ }, 20 * 60 * 1000));
+ }
+
+ private void sendJMSMessageToKickOffRoute() throws Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://test");
+ factory.setWatchTopicAdvisories(false);
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(new
ActiveMQQueue("scp_transacted"));
+ for (int i=0; i<messageCount;i++) {
+ TextMessage message = session.createTextMessage("Some Text,
messageCount:" + i);
+ message.setJMSCorrelationID("pleaseCorrelate");
+ producer.send(message);
+ }
+ LOG.info("Sent: " + messageCount);
+ connection.close();
+ }
+
+ private BrokerService createBroker(boolean deleteAllMessages) throws
Exception {
+ BrokerService brokerService = new BrokerService();
+ brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ brokerService.setBrokerName("test");
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ // defaultPolicy.setStrictOrderDispatch(false);
+ policyMap.setDefaultEntry(defaultPolicy);
+ brokerService.setDestinationPolicy(policyMap);
+
+ brokerService.setAdvisorySupport(false);
+ brokerService.setDataDirectory("target/data");
+ brokerService.addConnector("tcp://localhost:61616");
+ return brokerService;
+ }
+
+ @Override
+ protected AbstractXmlApplicationContext createApplicationContext() {
+
+ deleteDirectory("target/data");
+
+ // make broker available to recovery processing on app context start
+ try {
+ broker = createBroker(true);
+ broker.start();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start broker", e);
+ }
+
+ try {
+ sendJMSMessageToKickOffRoute();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to fill q", e);
+ }
+
+ return new
ClassPathXmlApplicationContext("org/apache/activemq/camel/transactedconsume.xml");
+ }
+
+ static class ConnectionLog implements Processor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ ActiveMQTextMessage m = (ActiveMQTextMessage)
((JmsMessage)exchange.getIn()).getJmsMessage();
+ Thread.currentThread().sleep(10);
+ LOG.info("received on " + m.getConnection().toString());
+ }
+ }
+
+}
\ No newline at end of file
Propchange:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml?rev=1162061&view=auto
==============================================================================
---
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
(added)
+++
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
Fri Aug 26 11:04:14 2011
@@ -0,0 +1,78 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+ http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
+ http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
+ ">
+
+ <context:annotation-config/>
+
+ <bean id="vhfBatchListenerJMSConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL"
value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
+ </bean>
+
+ <bean id="vhfBatchListenerPooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
+ <property name="maxConnections" value="10"/>
+ <property name="maximumActive" value="10"/>
+ <property name="connectionFactory"
ref="vhfBatchListenerJMSConnectionFactory"/>
+ </bean>
+
+ <!-- bean id="vhfBatchListenerSingleConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
+ <property name="reconnectOnException" value="true" />
+ <property name="targetConnectionFactory"
ref="vhfBatchListenerJMSConnectionFactory" />
+ </bean -->
+
+ <!-- JMS Transaction manager -->
+ <bean id="vhfBatchListenerJMSTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
+ <property name="connectionFactory"
ref="vhfBatchListenerPooledConnectionFactory"/>
+ </bean>
+
+ <!-- JMS Configuration -->
+ <bean id="vhfBatchListenerJMSConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
+ <property name="connectionFactory"
ref="vhfBatchListenerPooledConnectionFactory"/>
+ <property name="transactionManager"
ref="vhfBatchListenerJMSTransactionManager"/>
+ <property name="transacted" value="true"/>
+ <property name="concurrentConsumers" value="10"/>
+ <property name="cacheLevelName" value="CACHE_CONSUMER"/>
+ </bean>
+
+ <!-- JMS Transaction policy -->
+ <bean id="vhfBatchListenerTransaction"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+ <property name="transactionManager"
ref="vhfBatchListenerJMSTransactionManager"/>
+ </bean>
+
+ <!-- ActiveMQ component -->
+ <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="activemq:queue:scp_transacted"/>
+ <!-- transacted /-->
+ <process ref="connectonLog"/>
+ </route>
+ </camelContext>
+
+ <bean id="connectonLog"
class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
+</beans>
Propchange:
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml