Author: gertv
Date: Mon Apr 26 12:45:02 2010
New Revision: 938024
URL: http://svn.apache.org/viewvc?rev=938024&view=rev
Log:
SMX4-523: Use Camel Synchonization to ensure exchanges can be handled by Camel
thread
Added:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
Modified:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
Modified:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java?rev=938024&r1=938023&r2=938024&view=diff
==============================================================================
---
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
(original)
+++
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
Mon Apr 26 12:45:02 2010
@@ -31,7 +31,7 @@ import org.apache.servicemix.nmr.api.ser
/**
* A {...@link Consumer} that receives Camel {...@link
org.apache.camel.Exchange}s and sends them into the ServiceMix NMR
*/
-public class ServiceMixConsumer extends DefaultConsumer implements
org.apache.servicemix.nmr.api.Endpoint {
+public class ServiceMixConsumer extends DefaultConsumer implements
org.apache.servicemix.nmr.api.Endpoint, Synchronization {
private Channel channel;
@@ -68,25 +68,9 @@ public class ServiceMixConsumer extends
if (exchange.getStatus() == Status.Active) {
try {
org.apache.camel.Exchange camelExchange =
getEndpoint().createExchange(exchange);
- getProcessor().process(camelExchange);
+ camelExchange.addOnCompletion(this);
- // extract the NMR Exchange from the Camel Exchange
-
getEndpoint().getComponent().getBinding().extractNmrExchange(camelExchange);
-
- // just copy the camelExchange back to the nmr exchange
- exchange.getProperties().putAll(camelExchange.getProperties());
- if (camelExchange.hasOut() &&
!camelExchange.getOut().isFault()) {
- getEndpoint().getComponent().getBinding().
- copyCamelMessageToNmrMessage(exchange.getOut(),
camelExchange.getOut());
- } else if (camelExchange.hasOut() &&
camelExchange.getOut().isFault()) {
- getEndpoint().getComponent().getBinding().
-
copyCamelMessageToNmrMessage(exchange.getFault(), camelExchange.getOut());
- } else if (camelExchange.getException() != null) {
- throw (Exception) camelExchange.getException();
- } else {
- exchange.setStatus(Status.Done);
- }
- channel.send(exchange);
+ getProcessor().process(camelExchange);
} catch (Exception e) {
exchange.setError(e);
exchange.setStatus(Status.Error);
@@ -94,4 +78,32 @@ public class ServiceMixConsumer extends
}
}
}
+
+ private void handleCamelResponse(Exchange exchange,
org.apache.camel.Exchange camelExchange) {
+ // just copy the camelExchange back to the nmr exchange
+ exchange.getProperties().putAll(camelExchange.getProperties());
+ if (camelExchange.hasOut() && !camelExchange.getOut().isFault()) {
+ getEndpoint().getComponent().getBinding().
+ copyCamelMessageToNmrMessage(exchange.getOut(),
camelExchange.getOut());
+ } else if (camelExchange.hasOut() && camelExchange.getOut().isFault())
{
+ getEndpoint().getComponent().getBinding().
+ copyCamelMessageToNmrMessage(exchange.getFault(),
camelExchange.getOut());
+ } else if (camelExchange.getException() != null) {
+ exchange.setError(camelExchange.getException());
+ exchange.setStatus(Status.Error);
+ } else {
+ exchange.setStatus(Status.Done);
+ }
+ channel.send(exchange);
+ }
+
+ public void onComplete(org.apache.camel.Exchange exchange) {
+ Exchange nmr =
getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
+ handleCamelResponse(nmr, exchange);
+ }
+
+ public void onFailure(org.apache.camel.Exchange exchange) {
+ Exchange nmr =
getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
+ handleCamelResponse(nmr, exchange);
+ }
}
Modified:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java?rev=938024&r1=938023&r2=938024&view=diff
==============================================================================
---
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
(original)
+++
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
Mon Apr 26 12:45:02 2010
@@ -22,6 +22,9 @@ import org.apache.servicemix.executors.E
import org.apache.servicemix.executors.impl.ExecutorConfig;
import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Exchange;
+import org.apache.servicemix.nmr.api.event.ExchangeListener;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
import org.apache.servicemix.nmr.core.ServiceMix;
/**
@@ -29,7 +32,7 @@ import org.apache.servicemix.nmr.core.Se
* - the NMR component is available with URI prefix nmr:
* - a client channel to the NMR can be obtained with the {...@link
#getChannel()} method
*/
-public abstract class AbstractComponentTest extends ContextTestSupport {
+public abstract class AbstractComponentTest extends ContextTestSupport
implements ExchangeListener {
private ServiceMix nmr;
private ServiceMixComponent component;
@@ -40,6 +43,8 @@ public abstract class AbstractComponentT
nmr = new ServiceMix();
nmr.setExecutorFactory(createExecutorFactory());
nmr.init();
+
+ nmr.getListenerRegistry().register(this, ServiceHelper.createMap());
component = new ServiceMixComponent();
component.setNmr(nmr);
@@ -57,7 +62,7 @@ public abstract class AbstractComponentT
ExecutorConfig config = factory.getDefaultConfig();
config.setCorePoolSize(1);
config.setMaximumPoolSize(16);
- config.setQueueSize(256);
+ config.setQueueSize(0);
config.setBypassIfSynchronous(true);
return factory;
@@ -82,4 +87,16 @@ public abstract class AbstractComponentT
return channel;
}
+
+ public void exchangeSent(Exchange exchange) {
+ // graciously do nothing
+ }
+
+ public void exchangeDelivered(Exchange exchange) {
+ // graciously do nothing
+ }
+
+ public void exchangeFailed(Exchange exchange) {
+ // graciously do nothing
+ }
}
Added:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java?rev=938024&view=auto
==============================================================================
---
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
(added)
+++
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
Mon Apr 26 12:45:02 2010
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel.nmr;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.servicemix.nmr.api.Status;
+
+/**
+ * Test case for making sure that the component behaves properly if the Camel
route is using
+ * asynchronous elements (e.g. threads or seda queues)
+ */
+public class CamelAsyncRouteTest extends AbstractComponentTest {
+
+ private static final String HANDLED_BY_THREAD = "HandledByThread";
+ private static final int COUNT = 1000;
+
+ /* Latch to count NMR Done Exchanges */
+ private CountDownLatch done;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ done = new CountDownLatch(COUNT);
+ }
+
+ public void testCamelThreads() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:threads");
+ mock.expectedMessageCount(COUNT);
+
+ getMockEndpoint("mock:sent").expectedMessageCount(COUNT);
+
+ for (int i = 0 ; i < COUNT ; i++) {
+ template.asyncSendBody("direct:threads", "Simple message body");
+ }
+
+ assertMockEndpointsSatisfied();
+
+ for (Exchange exchange : mock.getExchanges()) {
+ Thread thread = exchange.getProperty(HANDLED_BY_THREAD,
Thread.class);
+ assertTrue("onCompletion should have been called from the Camel
'threads' thread pool",
+ thread.getName().contains("Camel") &&
thread.getName().contains("Threads"));
+ }
+
+ assertTrue("All NMR exchanges should have been marked DONE",
+ done.await(20, TimeUnit.SECONDS));
+ }
+
+ public void testCamelSeda() throws InterruptedException {
+ getMockEndpoint("mock:sent").expectedMessageCount(COUNT);
+ getMockEndpoint("mock:seda").expectedMessageCount(COUNT);
+
+ for (int i = 0 ; i < COUNT ; i++) {
+ template.asyncSendBody("seda:seda", "Simple message body");
+ }
+
+ assertMockEndpointsSatisfied();
+
+ assertTrue("All NMR exchanges should have been marked DONE",
+ done.await(20, TimeUnit.SECONDS));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:threads").to("mock:sent").to("nmr:threads");
+ from("nmr:threads")
+ .onCompletion().process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ exchange.setProperty(HANDLED_BY_THREAD,
Thread.currentThread());
+ }
+ })
+ .threads(5).to("mock:threads");
+
+
from("seda:seda?concurrentConsumers=10").to("mock:sent").to("nmr:seda");
+
from("nmr:seda").to("seda:seda-internal?waitForTaskToComplete=Never");
+ from("seda:seda-internal").to("mock:seda");
+
+ }
+ };
+ }
+
+ @Override
+ public void exchangeDelivered(org.apache.servicemix.nmr.api.Exchange
exchange) {
+ if (exchange.getStatus().equals(Status.Done)) {
+ done.countDown();
+ }
+ }
+}
Modified:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties?rev=938024&r1=938023&r2=938024&view=diff
==============================================================================
---
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
(original)
+++
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
Mon Apr 26 12:45:02 2010
@@ -20,6 +20,10 @@
#
log4j.rootLogger=DEBUG, out
+# Separate loggers for Camel and ServiceMix to reduce lock contention
+log4j.org.apache.camel=DEBUG, out
+log4j.org.apache.servicemix=DEBUG,out
+
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout