Author: gertv
Date: Mon Apr 26 12:45:02 2010
New Revision: 938024

URL: http://svn.apache.org/viewvc?rev=938024&view=rev
Log:
SMX4-523: Use Camel Synchonization to ensure exchanges can be handled by Camel 
thread

Added:
    
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
Modified:
    
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
    
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
    
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties

Modified: 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java?rev=938024&r1=938023&r2=938024&view=diff
==============================================================================
--- 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
 (original)
+++ 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
 Mon Apr 26 12:45:02 2010
@@ -31,7 +31,7 @@ import org.apache.servicemix.nmr.api.ser
 /**
  * A {...@link Consumer} that receives Camel {...@link 
org.apache.camel.Exchange}s and sends them into the ServiceMix NMR
  */
-public class ServiceMixConsumer extends DefaultConsumer implements 
org.apache.servicemix.nmr.api.Endpoint {
+public class ServiceMixConsumer extends DefaultConsumer implements 
org.apache.servicemix.nmr.api.Endpoint, Synchronization {
 
     private Channel channel;
 
@@ -68,25 +68,9 @@ public class ServiceMixConsumer extends 
        if (exchange.getStatus() == Status.Active) {
             try {
                org.apache.camel.Exchange camelExchange = 
getEndpoint().createExchange(exchange);
-               getProcessor().process(camelExchange);
+                camelExchange.addOnCompletion(this);
 
-                // extract the NMR Exchange from the Camel Exchange
-                
getEndpoint().getComponent().getBinding().extractNmrExchange(camelExchange);
-
-                // just copy the camelExchange back to the nmr exchange
-               exchange.getProperties().putAll(camelExchange.getProperties());
-                if (camelExchange.hasOut() && 
!camelExchange.getOut().isFault()) {
-                       getEndpoint().getComponent().getBinding().
-                               copyCamelMessageToNmrMessage(exchange.getOut(), 
camelExchange.getOut());
-                } else if (camelExchange.hasOut() && 
camelExchange.getOut().isFault()) {
-                       getEndpoint().getComponent().getBinding().
-                               
copyCamelMessageToNmrMessage(exchange.getFault(), camelExchange.getOut());
-                } else if (camelExchange.getException() != null) {
-                       throw (Exception) camelExchange.getException();
-                } else {
-                    exchange.setStatus(Status.Done);
-                }
-                channel.send(exchange);
+                getProcessor().process(camelExchange);
             } catch (Exception e) {
                 exchange.setError(e);
                 exchange.setStatus(Status.Error);
@@ -94,4 +78,32 @@ public class ServiceMixConsumer extends 
             }
         }
     }
+
+    private void handleCamelResponse(Exchange exchange, 
org.apache.camel.Exchange camelExchange) {
+        // just copy the camelExchange back to the nmr exchange
+        exchange.getProperties().putAll(camelExchange.getProperties());
+        if (camelExchange.hasOut() && !camelExchange.getOut().isFault()) {
+            getEndpoint().getComponent().getBinding().
+                copyCamelMessageToNmrMessage(exchange.getOut(), 
camelExchange.getOut());
+        } else if (camelExchange.hasOut() && camelExchange.getOut().isFault()) 
{
+            getEndpoint().getComponent().getBinding().
+                copyCamelMessageToNmrMessage(exchange.getFault(), 
camelExchange.getOut());
+        } else if (camelExchange.getException() != null) {
+            exchange.setError(camelExchange.getException());
+            exchange.setStatus(Status.Error);
+        } else {
+            exchange.setStatus(Status.Done);
+        }
+        channel.send(exchange);
+    }
+
+    public void onComplete(org.apache.camel.Exchange exchange) {
+        Exchange nmr = 
getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
+        handleCamelResponse(nmr, exchange);
+    }
+
+    public void onFailure(org.apache.camel.Exchange exchange) {
+        Exchange nmr = 
getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
+        handleCamelResponse(nmr, exchange);
+    }
 }

Modified: 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java?rev=938024&r1=938023&r2=938024&view=diff
==============================================================================
--- 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
 (original)
+++ 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
 Mon Apr 26 12:45:02 2010
@@ -22,6 +22,9 @@ import org.apache.servicemix.executors.E
 import org.apache.servicemix.executors.impl.ExecutorConfig;
 import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
 import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Exchange;
+import org.apache.servicemix.nmr.api.event.ExchangeListener;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
 import org.apache.servicemix.nmr.core.ServiceMix;
 
 /**
@@ -29,7 +32,7 @@ import org.apache.servicemix.nmr.core.Se
  * - the NMR component is available with URI prefix nmr:
  * - a client channel to the NMR can be obtained with the {...@link 
#getChannel()} method
  */
-public abstract class AbstractComponentTest extends ContextTestSupport {
+public abstract class AbstractComponentTest extends ContextTestSupport 
implements ExchangeListener {
 
     private ServiceMix nmr;
     private ServiceMixComponent component;
@@ -40,6 +43,8 @@ public abstract class AbstractComponentT
         nmr = new ServiceMix();
         nmr.setExecutorFactory(createExecutorFactory());
         nmr.init();
+        
+        nmr.getListenerRegistry().register(this, ServiceHelper.createMap());
 
         component = new ServiceMixComponent();
         component.setNmr(nmr);
@@ -57,7 +62,7 @@ public abstract class AbstractComponentT
         ExecutorConfig config = factory.getDefaultConfig();
         config.setCorePoolSize(1);
         config.setMaximumPoolSize(16);
-        config.setQueueSize(256);
+        config.setQueueSize(0);
         config.setBypassIfSynchronous(true);
 
         return factory;
@@ -82,4 +87,16 @@ public abstract class AbstractComponentT
 
         return channel;
     }
+
+    public void exchangeSent(Exchange exchange) {
+        // graciously do nothing
+    }
+
+    public void exchangeDelivered(Exchange exchange) {
+        // graciously do nothing
+    }
+
+    public void exchangeFailed(Exchange exchange) {
+        // graciously do nothing
+    }
 }

Added: 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java?rev=938024&view=auto
==============================================================================
--- 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
 (added)
+++ 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
 Mon Apr 26 12:45:02 2010
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel.nmr;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.servicemix.nmr.api.Status;
+
+/**
+ * Test case for making sure that the component behaves properly if the Camel 
route is using
+ * asynchronous elements (e.g. threads or seda queues)
+ */
+public class CamelAsyncRouteTest extends AbstractComponentTest {
+
+    private static final String HANDLED_BY_THREAD = "HandledByThread";
+    private static final int COUNT = 1000;
+
+    /* Latch to count NMR Done Exchanges */
+    private CountDownLatch done;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        done = new CountDownLatch(COUNT);
+    }
+
+    public void testCamelThreads() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:threads");
+        mock.expectedMessageCount(COUNT);
+
+        getMockEndpoint("mock:sent").expectedMessageCount(COUNT);
+
+        for (int i = 0 ; i < COUNT ; i++) {
+            template.asyncSendBody("direct:threads", "Simple message body");
+        }
+
+        assertMockEndpointsSatisfied();
+
+        for (Exchange exchange : mock.getExchanges()) {
+            Thread thread = exchange.getProperty(HANDLED_BY_THREAD, 
Thread.class);
+            assertTrue("onCompletion should have been called from the Camel 
'threads' thread pool",
+                       thread.getName().contains("Camel") && 
thread.getName().contains("Threads"));
+        }
+
+        assertTrue("All NMR exchanges should have been marked DONE",
+                   done.await(20, TimeUnit.SECONDS));        
+    }
+
+    public void testCamelSeda() throws InterruptedException {       
+        getMockEndpoint("mock:sent").expectedMessageCount(COUNT);
+        getMockEndpoint("mock:seda").expectedMessageCount(COUNT);
+
+        for (int i = 0 ; i < COUNT ; i++) {
+            template.asyncSendBody("seda:seda", "Simple message body");
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertTrue("All NMR exchanges should have been marked DONE",
+                   done.await(20, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:threads").to("mock:sent").to("nmr:threads");
+                from("nmr:threads")
+                    .onCompletion().process(new Processor() {
+                        public void process(Exchange exchange) throws 
Exception {
+                            exchange.setProperty(HANDLED_BY_THREAD, 
Thread.currentThread());
+                        }
+                    })
+                    .threads(5).to("mock:threads");
+
+                
from("seda:seda?concurrentConsumers=10").to("mock:sent").to("nmr:seda");
+                
from("nmr:seda").to("seda:seda-internal?waitForTaskToComplete=Never");
+                from("seda:seda-internal").to("mock:seda");
+
+            }
+        };
+    }
+
+    @Override
+    public void exchangeDelivered(org.apache.servicemix.nmr.api.Exchange 
exchange) {
+        if (exchange.getStatus().equals(Status.Done)) {
+            done.countDown();
+        }
+    }
+}

Modified: 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties?rev=938024&r1=938023&r2=938024&view=diff
==============================================================================
--- 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
 (original)
+++ 
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/resources/log4j.properties
 Mon Apr 26 12:45:02 2010
@@ -20,6 +20,10 @@
 #
 log4j.rootLogger=DEBUG, out
 
+# Separate loggers for Camel and ServiceMix to reduce lock contention
+log4j.org.apache.camel=DEBUG, out
+log4j.org.apache.servicemix=DEBUG,out
+
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout


Reply via email to