Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java?rev=1378796&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java Thu Aug 30 03:42:12 2012 @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Verify the ability to batch transactions. + * + */ +public class BatchTransactedQueueConsumerTest extends CamelTestSupport { + + /** + * Verify that messages are being redelivered + * @throws Exception + */ + @Test + public void testEndpointConfiguredBatchTransaction() throws Exception { + // We should get two sets of 10 messages. 10 before the rollback and 10 after the rollback. + getMockEndpoint("mock:test.before").expectedMessageCount(10); + getMockEndpoint("mock:test.after").expectedMessageCount(10); + + // Send only 10 messages + for (int i = 1; i <= 10; i++) { + template.sendBody("direct:start", "Hello World " + i); + } + + getMockEndpoint("mock:test.before").assertIsSatisfied(); + getMockEndpoint("mock:test.after").assertIsSatisfied(); + + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + SjmsComponent component = new SjmsComponent(); + component.setConnectionFactory(connectionFactory); + camelContext.addComponent("sjms", component); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + + // Having a producer route helps with debugging and logging + from("direct:start") + .to("sjms:queue:transacted.consumer.test"); + + // Our test consumer route + from("sjms:queue:transacted.consumer.test?transacted=true&transactionBatchCount=10") + // first consume all the messages that are not redelivered + .choice() + .when(header("JMSRedelivered").isEqualTo("false")) + .to("log:before_log?showAll=true") + .to("mock:test.before") + // This is where we will cause the rollback after 10 messages have been sent. + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // Get the body + String body = exchange.getIn().getBody(String.class); + + // If the message ends with 10, throw the exception + if (body.endsWith("10")) { + log.info("10th message received. Rolling back."); + exchange.getOut().setFault(true); + exchange.getOut().setBody("10th message received. Rolling back."); + } + } + }) + .otherwise() + .to("log:after_log?showAll=true") + .to("mock:test.after"); + } + }; + } +}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java?rev=1378796&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java Thu Aug 30 03:42:12 2012 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.BatchMessage; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class BatchTransactedQueueProducerTest extends CamelTestSupport { + + @Produce + protected ProducerTemplate template; + + @Test + public void testEndpointConfiguredBatchTransaction() throws Exception { + // We should see the World message twice, once for the exception + getMockEndpoint("mock:test.prebatch").expectedMessageCount(1); + getMockEndpoint("mock:test.postbatch").expectedMessageCount(30); + + List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); + for (int i = 1; i <= 30; i++) { + String body = "Hello World " + i; + BatchMessage<String> message = new BatchMessage<String>(body, null); + messages.add(message); + } + template.sendBody("direct:start", messages); + + getMockEndpoint("mock:test.prebatch").assertIsSatisfied(); + getMockEndpoint("mock:test.postbatch").assertIsSatisfied(); + + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + SjmsComponent sjms = new SjmsComponent(); + sjms.setConnectionFactory(connectionFactory); + camelContext.addComponent("sjms", sjms); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("log:test-before?showAll=true") + .to("sjms:queue:batch.queue?transacted=true") + .to("mock:test.prebatch"); + + from("sjms:queue:batch.queue") + .to("log:test-after?showAll=true") + .to("mock:test.postbatch"); + } + }; + } +} Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java?rev=1378796&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java Thu Aug 30 03:42:12 2012 @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Verify the ability to batch transactions. + * + */ +public class BatchTransactedTopicConsumerTest extends CamelTestSupport { + + /** + * Verify that messages are being redelivered + * @throws Exception + */ + @Test + public void testEndpointConfiguredBatchTransaction() throws Exception { + // We should get two sets of 10 messages. 10 before the rollback and 10 after the rollback. + getMockEndpoint("mock:test.before").expectedMessageCount(10); + getMockEndpoint("mock:test.after").expectedMessageCount(10); + + // Send only 10 messages + for (int i = 1; i <= 10; i++) { + template.sendBody("direct:start", "Hello World " + i); + } + + getMockEndpoint("mock:test.before").assertIsSatisfied(); + getMockEndpoint("mock:test.after").assertIsSatisfied(); + + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + SjmsComponent component = new SjmsComponent(); + component.setConnectionFactory(connectionFactory); + camelContext.addComponent("sjms", component); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + + // Having a producer route helps with debugging and logging + from("direct:start") + .to("sjms:topic:transacted.consumer.test"); + + // Our test consumer route + from("sjms:topic:transacted.consumer.test?transacted=true&transactionBatchCount=10") + // first consume all the messages that are not redelivered + .choice() + .when(header("JMSRedelivered").isEqualTo("false")) + .to("log:before_log?showAll=true") + .to("mock:test.before") + // This is where we will cause the rollback after 10 messages have been sent. + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // Get the body + String body = exchange.getIn().getBody(String.class); + + // If the message ends with 10, throw the exception + if (body.endsWith("10")) { + log.info("10th message received. Rolling back."); + exchange.getOut().setFault(true); + exchange.getOut().setBody("10th message received. Rolling back."); + } + } + }) + .otherwise() + .to("log:after_log?showAll=true") + .to("mock:test.after"); + } + }; + } +} Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java?rev=1378796&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java Thu Aug 30 03:42:12 2012 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.BatchMessage; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class BatchTransactedTopicProducerTest extends CamelTestSupport { + + @Produce + protected ProducerTemplate template; + + @Test + public void testEndpointConfiguredBatchTransaction() throws Exception { + // We should see the World message twice, once for the exception + getMockEndpoint("mock:test.prebatch").expectedMessageCount(1); + getMockEndpoint("mock:test.postbatch").expectedMessageCount(30); + + List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); + for (int i = 1; i <= 30; i++) { + String body = "Hello World " + i; + BatchMessage<String> message = new BatchMessage<String>(body, null); + messages.add(message); + } + template.sendBody("direct:start", messages); + + getMockEndpoint("mock:test.prebatch").assertIsSatisfied(); + getMockEndpoint("mock:test.postbatch").assertIsSatisfied(); + + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + SjmsComponent sjms = new SjmsComponent(); + sjms.setConnectionFactory(connectionFactory); + camelContext.addComponent("sjms", sjms); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("log:test-before?showAll=true") + .to("sjms:topic:batch.topic?transacted=true") + .to("mock:test.prebatch"); + + from("sjms:topic:batch.topic") + .to("log:test-after?showAll=true") + .to("mock:test.postbatch"); + } + }; + } +} Copied: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java (from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java?p2=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java&p1=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java&r1=1378786&r2=1378796&rev=1378796&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java Thu Aug 30 03:42:12 2012 @@ -14,11 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.sjms.consumer; +package org.apache.camel.component.sjms.tx; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; -import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -26,7 +25,6 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.component.sjms.SjmsComponent; import org.apache.camel.component.sjms.jms.JmsMessageHeaderType; import org.apache.camel.test.junit4.CamelTestSupport; - import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +63,7 @@ public class TransactedInOnlyQueueConsum protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( - "vm://broker?broker.persistent=false&broker.useJmx=true"); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); SjmsComponent component = new SjmsComponent(); component.setConnectionFactory(connectionFactory); camelContext.addComponent("sjms", component); @@ -86,7 +83,7 @@ public class TransactedInOnlyQueueConsum logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId()); if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) { logger.info("Exchange does not have a retry message. Set the exception and allow the retry."); - exchange.setException(new CamelException("Creating Failure")); + exchange.getOut().setFault(true); } else { logger.info("Exchange has retry header. Continue processing the message."); } Copied: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java (from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java?p2=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java&p1=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java&r1=1378786&r2=1378796&rev=1378796&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java Thu Aug 30 03:42:12 2012 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.sjms.consumer; +package org.apache.camel.component.sjms.tx; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; @@ -84,7 +84,7 @@ public class TransactedInOnlyTopicConsum logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId()); if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) { logger.info("Exchange does not have a retry message. Set the exception and allow the retry."); - exchange.setException(new RuntimeCamelException("Creating Failure")); + exchange.getOut().setFault(true); } else { logger.info("Exchange has retry header. Continue processing the message."); } Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java?rev=1378796&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java Thu Aug 30 03:42:12 2012 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class TransactedQueueProducerTest extends CamelTestSupport { + + @Produce + protected ProducerTemplate template; + + public TransactedQueueProducerTest() { + } + + @Override + protected boolean useJmx() { + return false; + } + + @Test + public void testTransactedProducer() throws Exception { + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World 2"); + + template.sendBodyAndHeader("direct:start", "Hello World 1", "isfailed", true); + template.sendBodyAndHeader("direct:start", "Hello World 2", "isfailed", false); + + mock.assertIsSatisfied(); + } + + + /* + * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext() + * @return + * @throws Exception + */ + @Override + protected CamelContext createCamelContext() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + CamelContext camelContext = super.createCamelContext(); + SjmsComponent component = new SjmsComponent(); + component.setConnectionFactory(connectionFactory); + camelContext.addComponent("sjms", component); + return camelContext; + } + + /* + * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder() + * @return + * @throws Exception + */ + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:start") + .to("sjms:queue:test.queue?transacted=true") + .process( + new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + if (exchange.getIn().getHeader("isfailed", Boolean.class)) { + log.info("We failed. Should roll back."); + exchange.getOut().setFault(true); + } else { + log.info("We passed. Should commit."); + } + } + }); + + from("sjms:queue:test.queue?durableSubscriptionId=bar&transacted=true") + .to("mock:result"); + + + } + }; + } +} Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java?rev=1378796&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java (added) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java Thu Aug 30 03:42:12 2012 @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.component.sjms.jms.ConnectionFactoryResource; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class TransactedTopicProducerTest extends CamelTestSupport { + + @Produce + protected ProducerTemplate template; + protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + + public TransactedTopicProducerTest() { + } + + @Override + protected boolean useJmx() { + return false; + } + + @Test + public void testTransactedProducer() throws Exception { + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World 2"); + + template.sendBodyAndHeader("direct:start", "Hello World 1", "isfailed", true); + template.sendBodyAndHeader("direct:start", "Hello World 2", "isfailed", false); + + mock.assertIsSatisfied(); + } + + + /* + * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext() + * @return + * @throws Exception + */ + @Override + protected CamelContext createCamelContext() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true"); + ConnectionFactoryResource connectionResource = new ConnectionFactoryResource(); + connectionResource.setConnectionFactory(connectionFactory); + connectionResource.setClientId("test-connection-1"); + CamelContext camelContext = super.createCamelContext(); + SjmsComponent component = new SjmsComponent(); + component.setConnectionResource(connectionResource); + component.setMaxConnections(1); + camelContext.addComponent("sjms", component); + return camelContext; + } + + /* + * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder() + * @return + * @throws Exception + */ + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:start") + .to("sjms:topic:test.topic?transacted=true") + .process( + new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + if (exchange.getIn().getHeader("isfailed", Boolean.class)) { + log.info("We failed. Should roll back."); + exchange.getOut().setFault(true); + } else { + log.info("We passed. Should commit."); + } + } + }); + + from("sjms:topic:test.topic?durableSubscriptionId=bar&transacted=true") + .to("mock:result"); + + } + }; + } +} \ No newline at end of file Modified: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1378796&r1=1378795&r2=1378796&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Thu Aug 30 03:42:12 2012 @@ -17,13 +17,14 @@ # # The logging properties used # -log4j.rootLogger=INFO, file +log4j.rootLogger=INFO, out # uncomment the following line to turn on Camel debugging log4j.logger.org.apache.activemq=info +log4j.logger.org.apache.activemq.transaction=trace log4j.logger.org.apache.camel=info log4j.logger.org.apache.camel.converter=info -log4j.logger.org.apache.camel.component.sjms=debug +log4j.logger.org.apache.camel.component.sjms=info # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender
