Author: hadrian
Date: Fri Apr 18 06:48:52 2008
New Revision: 649539

URL: http://svn.apache.org/viewvc?rev=649539&view=rev
Log:
Missing files from previous commit on CAMEL-469.

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Out.java
    
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Out.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Out.java?rev=649539&view=auto
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Out.java 
(added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Out.java 
Fri Apr 18 06:48:52 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+public class Out<V> {
+    private V value;
+    
+    public Out() {
+    }
+    public Out(V val) {
+        value = val;
+    }
+    public V get() {
+        return value;
+    }
+    public void set(V val) {
+        value = val;
+    }
+}

Added: 
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java?rev=649539&view=auto
==============================================================================
--- 
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
 (added)
+++ 
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
 Fri Apr 18 06:48:52 2008
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsRouteRequestReplyTest extends ContextTestSupport {
+    protected static String componentName = "amq";
+    protected static String componentName1 = "amq1";
+    protected static String endpoingUriA = componentName + ":queue:test.a";
+    protected static String endpointUriB = componentName + ":queue:test.b";
+    protected static String endpointUriB1 = componentName1 + ":queue:test.b";
+    protected static String request = "Hello World";
+    protected static String expectedReply = "Re: " + request;
+    protected static int maxTasks = 100;
+    protected static int maxServerTasks = maxTasks / 5;
+    protected static int maxCalls = 10;
+    protected static AtomicBoolean inited = new AtomicBoolean(false);
+    protected static Map<String, ContextBuilder> contextBuilders = new 
HashMap<String, ContextBuilder>();
+    protected static Map<String, RouteBuilder> routeBuilders = new 
HashMap<String, RouteBuilder>();
+    
+    private interface ContextBuilder {
+        public CamelContext buildContext(CamelContext context) throws 
Exception;
+    }
+
+    public static class SingleNodeDeadEndRouteBuilder extends RouteBuilder {
+        public void configure() throws Exception {
+            from(endpoingUriA).process(new Processor() {
+                public void process(Exchange e) {
+                    // do nothing
+                }
+            });
+        }
+    };
+
+    public static class SingleNodeRouteBuilder extends RouteBuilder {
+        public void configure() throws Exception {
+            from(endpoingUriA).process(new Processor() {
+                public void process(Exchange e) {
+                    String request = e.getIn().getBody(String.class);
+                    e.getOut(true).setBody(expectedReply + 
request.substring(request.indexOf('-')));
+                }
+            });
+        }
+    };
+
+    public static class MultiNodeRouteBuilder extends RouteBuilder {
+        public void configure() throws Exception {
+            from(endpoingUriA).to(endpointUriB);
+            from(endpointUriB).process(new Processor() {
+                public void process(Exchange e) {
+                    String request = e.getIn().getBody(String.class);
+                    e.getOut(true).setBody(expectedReply + 
request.substring(request.indexOf('-')));
+                }
+            });
+        }
+    };
+
+    public static class MultiNodeDiffCompRouteBuilder extends RouteBuilder {
+        public void configure() throws Exception {
+            from(endpoingUriA).to(endpointUriB1);
+            from(endpointUriB1).process(new Processor() {
+                public void process(Exchange e) {
+                    String request = e.getIn().getBody(String.class);
+                    e.getOut(true).setBody(expectedReply + 
request.substring(request.indexOf('-')));
+                }
+            });
+        }
+    };
+
+    protected static void init() {
+        if (inited.compareAndSet(false, true)) {
+            
+            ContextBuilder contextBuilderMessageID = new ContextBuilder() {
+                public CamelContext buildContext(CamelContext context) throws 
Exception {
+                    ConnectionFactory connectionFactory = 
+                        new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+                    JmsComponent jmsComponent = 
jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(true);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    context.addComponent(componentName, jmsComponent);
+                    return context;
+                }
+            };
+            
+            ContextBuilder contextBuilderCorrelationID = new ContextBuilder() {
+                public CamelContext buildContext(CamelContext context) throws 
Exception {
+                    ConnectionFactory connectionFactory = 
+                        new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+                    JmsComponent jmsComponent = 
jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(false);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    context.addComponent(componentName, jmsComponent);
+                    return context;
+                }
+            };
+    
+            ContextBuilder contextBuilderCorrelationIDDiffComp = new 
ContextBuilder() {
+                public CamelContext buildContext(CamelContext context) throws 
Exception {
+                    ConnectionFactory connectionFactory = 
+                        new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+                    JmsComponent jmsComponent = 
jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(false);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    context.addComponent(componentName, jmsComponent);
+                    jmsComponent = 
jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(false);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    context.addComponent(componentName1, jmsComponent);
+                    return context;
+                }
+            };
+    
+            
+            contextBuilders.put("testUseMessageIDAsCorrelationID", 
contextBuilderMessageID);
+            contextBuilders.put("testUseCorrelationID", 
contextBuilderCorrelationID);
+            contextBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", 
contextBuilderMessageID);
+            contextBuilders.put("testUseCorrelationIDMultiNode", 
contextBuilderCorrelationID);
+            contextBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", 
contextBuilderCorrelationIDDiffComp);
+            contextBuilders.put("testUseMessageIDAsCorrelationIDTimeout", 
contextBuilderMessageID);
+            contextBuilders.put("testUseCorrelationIDTimeout", 
contextBuilderMessageID);
+            
+            routeBuilders.put("testUseMessageIDAsCorrelationID", new 
SingleNodeRouteBuilder());
+            routeBuilders.put("testUseCorrelationID", new 
SingleNodeRouteBuilder());
+            routeBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", new 
MultiNodeRouteBuilder());
+            routeBuilders.put("testUseCorrelationIDMultiNode", new 
MultiNodeRouteBuilder());
+            routeBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", 
new MultiNodeDiffCompRouteBuilder());
+            routeBuilders.put("testUseMessageIDAsCorrelationIDTimeout", new 
SingleNodeDeadEndRouteBuilder());
+            routeBuilders.put("testUseCorrelationIDTimeout", new 
SingleNodeDeadEndRouteBuilder());
+        }
+    }
+    
+    public class Task extends Thread {
+        private AtomicInteger counter;
+        private boolean ok = true;
+        private String message = "";
+        
+        public Task(AtomicInteger counter) {
+            this.counter = counter;
+        }
+        
+        public void run() {
+            for (int i = 0; i < maxCalls; i++) {
+                int callId = counter.incrementAndGet();
+                Object reply = template.requestBody(endpoingUriA, request + 
"-" + callId);
+                if (!reply.equals(expectedReply + "-" + callId)) {
+                    ok = false;
+                    message = "Unexpected reply. Expected: '" + expectedReply  
+ "-" + callId 
+                              + "'; Received: '" +  reply +"'"; 
+                }
+            }            
+        }
+        public void assertSuccess() {
+            assertTrue(message, ok);
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        init();
+        super.setUp();
+    }
+
+    public void testUseMessageIDAsCorrelationID() throws Exception {
+        runRequestReplyThreaded();
+    }
+
+    public void testUseCorrelationID() throws Exception {
+        runRequestReplyThreaded();
+    }
+    
+    public void testUseMessageIDAsCorrelationIDMultiNode() throws Exception {
+         runRequestReplyThreaded();
+    }
+
+    public void testUseCorrelationIDTimeout() throws Exception {
+        Object reply = template.requestBody(endpoingUriA, request);
+        assertEquals(reply, request);
+        JmsComponent c = (JmsComponent)context.getComponent(componentName);
+        // Wait 1 extra purge cycle to make sure that TimeoutMap had a chance 
to cleanup
+        Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
+        assertTrue(c.getRequestor().getRequestMap().size() == 0);
+    }
+
+    public void testUseMessageIDAsCorrelationIDTimeout() throws Exception {
+        JmsComponent c = (JmsComponent)context.getComponent(componentName);
+        Object reply = template.requestBody(endpoingUriA, request);
+        assertEquals(reply, request);
+        // Wait 1 extra purge cycle to make sure that TimeoutMap had a chance 
to cleanup
+        Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
+        assertTrue(c.getRequestor().getDeferredRequestMap().size() == 0);
+    }
+ 
+    public void testUseCorrelationIDMultiNodeDiffComponents() throws Exception 
{
+        runRequestReplyThreaded();
+    }
+ 
+    /*
+     * REVISIT: This currently fails because there is a single instance of 
Requestor per JmsComponent
+     * which shares requestMap amongst JmsProducers. This is a problem in case 
where the same correlationID
+     * value travels between nodes serviced by the same JmsComponent:
+     * client -> producer1 -> corrId -> consumer1 -> producer2 -> corrId -> 
consumer
+     * producer1 (Bum! @) <- corrId <- consumer1 <- producer2 <- corrId <- 
reply
+     * 
+     * @ - The request entry for corrId was already removed from JmsProducer 
shared requestMap
+     * 
+     * Possible ways to solve this: Each JmsProducer gets its own replyTo 
destination
+     * 
+
+        public void testUseCorrelationIDMultiNode() throws Exception {
+            runRequestReplyThreaded();
+        }
+    */
+    
+    protected void runRequestReplyThreaded() throws Exception {
+        final AtomicInteger counter = new AtomicInteger(-1);
+        Task[] tasks = new Task[maxTasks];
+        for (int i = 0; i < maxTasks; ++i) {
+            Task task = new Task(counter);
+            tasks[i] = task;
+            task.start();
+        }
+        for (int i = 0; i < maxTasks; ++i) {
+            tasks[i].join();
+            tasks[i].assertSuccess();
+        }
+    }
+    
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        return contextBuilders.get(getName()).buildContext(camelContext);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return routeBuilders.get(getName());
+    }
+}


Reply via email to