Added: 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java?rev=741598&view=auto
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java
 (added)
+++ 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java
 Fri Feb  6 15:50:53 2009
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.cluster;
+
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.nmr.api.Endpoint;
+import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Exchange;
+import org.apache.servicemix.nmr.api.Pattern;
+import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.core.util.StringSource;
+import org.apache.servicemix.jbi.cluster.requestor.Transacted;
+
+public class GenericInOnlyClusterEndpointTest extends 
AbstractClusterEndpointTest {
+
+    private static final long TIMEOUT = 60 * 1000;
+
+    private ClusterEndpoint cluster1;
+    private ClusterEndpoint cluster2;
+    private ReceiverEndpoint receiver;
+    private ProxyEndpoint proxy;
+
+    public void testInOnlyNoTxNoRb() throws Exception {
+        createRoute(Transacted.None, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyNoTxNoRbInError() throws Exception {
+        createRoute(Transacted.None, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyNoTxRb() throws Exception {
+        createRoute(Transacted.None, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyNoTxRbInError() throws Exception {
+        createRoute(Transacted.None, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyJmsTxNoRb() throws Exception {
+        createRoute(Transacted.Jms, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyJmsTxNoRbInError() throws Exception {
+        createRoute(Transacted.Jms, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyJmsTxRb() throws Exception {
+        createRoute(Transacted.Jms, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyJmsTxRbInError() throws Exception {
+        createRoute(Transacted.Jms, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOnlyXaTxNoRb() throws Exception {
+        createRoute(Transacted.Xa, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyXaTxNoRbInError() throws Exception {
+        createRoute(Transacted.Xa, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyXaTxRb() throws Exception {
+        createRoute(Transacted.Xa, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyXaTxRbInError() throws Exception {
+        createRoute(Transacted.Xa, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOnlyAckTxNoRb() throws Exception {
+        createRoute(Transacted.ClientAck, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyAckTxNoRbInError() throws Exception {
+        createRoute(Transacted.ClientAck, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyAckTxRb() throws Exception {
+        createRoute(Transacted.ClientAck, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOnlyAckTxRbInError() throws Exception {
+        createRoute(Transacted.ClientAck, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOnly);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Done, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    protected void createRoute(Transacted transacted,
+                               boolean rollbackOnErrors,
+                               boolean sendFault,
+                               boolean sendError) throws Exception {
+        cluster1 = createCluster(nmr1, "nmr1", transacted, rollbackOnErrors);
+        cluster2 = createCluster(nmr2, "nmr2", transacted, !rollbackOnErrors); 
// the rollbackOnErrors flag should not be used on the JMS consumer side
+        receiver = createReceiver(nmr2, sendFault, sendError);
+        proxy = createProxy(nmr1, cluster1);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        cluster1.destroy();
+        cluster2.destroy();
+        super.tearDown();
+    }
+}

Added: 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java?rev=741598&view=auto
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java
 (added)
+++ 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java
 Fri Feb  6 15:50:53 2009
@@ -0,0 +1,215 @@
+package org.apache.servicemix.jbi.cluster;
+
+import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Exchange;
+import org.apache.servicemix.nmr.api.Pattern;
+import org.apache.servicemix.nmr.api.Endpoint;
+import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.jbi.cluster.requestor.Transacted;
+import org.apache.camel.converter.jaxp.StringSource;
+
+public class GenericInOutClusterEndpointTest extends 
AbstractClusterEndpointTest {
+
+    private static final long TIMEOUT = 60 * 1000;
+
+    private ClusterEndpoint cluster1;
+    private ClusterEndpoint cluster2;
+    private ReceiverEndpoint receiver;
+    private ProxyEndpoint proxy;
+
+    public void testInOutNoTxNoRb() throws Exception {
+        createRoute(Transacted.None, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOutNoTxNoRbInError() throws Exception {
+        createRoute(Transacted.None, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOutNoTxRb() throws Exception {
+        createRoute(Transacted.None, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOutNoTxRbInError() throws Exception {
+        createRoute(Transacted.None, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOutJmsTxNoRb() throws Exception {
+        createRoute(Transacted.Jms, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOutJmsTxNoRbInError() throws Exception {
+        createRoute(Transacted.Jms, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOutJmsTxRb() throws Exception {
+        createRoute(Transacted.Jms, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOutJmsTxRbInError() throws Exception {
+        createRoute(Transacted.Jms, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(3, TIMEOUT);
+    }
+
+    public void testInOutAckTxNoRb() throws Exception {
+        createRoute(Transacted.ClientAck, false, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOutAckTxNoRbInError() throws Exception {
+        createRoute(Transacted.ClientAck, false, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Error, exchange.getStatus());
+        client.close();
+        receiver.assertExchangesReceived(1, TIMEOUT);
+    }
+
+    public void testInOutAckTxRb() throws Exception {
+        createRoute(Transacted.ClientAck, true, false, false);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(2, TIMEOUT);
+    }
+
+    public void testInOutAckTxRbInError() throws Exception {
+        createRoute(Transacted.ClientAck, true, false, true);
+
+        Channel client = nmr1.createChannel();
+        Exchange exchange = client.createExchange(Pattern.InOut);
+        exchange.getIn().setBody(new StringSource("<hello/>"));
+        
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+        client.sendSync(exchange);
+        assertEquals(Status.Active, exchange.getStatus());
+        exchange.setStatus(Status.Done);
+        client.send(exchange);
+        client.close();
+        receiver.assertExchangesReceived(3, TIMEOUT);
+    }
+
+    protected void createRoute(Transacted transacted,
+                               boolean rollbackOnErrors,
+                               boolean sendFault,
+                               boolean sendError) throws Exception {
+        cluster1 = createCluster(nmr1, "nmr1", transacted, rollbackOnErrors);
+        cluster2 = createCluster(nmr2, "nmr2", transacted, !rollbackOnErrors); 
// the rollbackOnErrors flag should not be used on the JMS consumer side
+        receiver = createReceiver(nmr2, sendFault, sendError);
+        proxy = createProxy(nmr1, cluster1);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        cluster1.destroy();
+        cluster2.destroy();
+        super.tearDown();
+    }
+}

Added: 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java?rev=741598&view=auto
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java
 (added)
+++ 
servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java
 Fri Feb  6 15:50:53 2009
@@ -0,0 +1,147 @@
+package org.apache.servicemix.jbi.cluster;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.pool.XaPooledConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.servicemix.jbi.cluster.requestor.Transacted;
+import 
org.apache.servicemix.jbi.cluster.requestor.AbstractPollingRequestorPool;
+import org.apache.servicemix.jbi.cluster.requestor.ActiveMQJmsRequestorPool;
+import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Exchange;
+import org.apache.servicemix.nmr.api.Pattern;
+import org.apache.servicemix.nmr.api.Endpoint;
+import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.nmr.core.util.StringSource;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: gnodet
+ * Date: Feb 3, 2009
+ * Time: 5:18:32 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReconnectTest extends AbstractClusterEndpointTest {
+
+    private static final long TIMEOUT = 10 * 60 * 1000;
+
+    private ClusterEndpoint cluster1;
+    private ClusterEndpoint cluster2;
+    private ReceiverEndpoint receiver;
+    private ProxyEndpoint proxy;
+
+    public void testLoadInOnly() throws Exception {
+        createRoute(Transacted.Jms, true, false, false);
+
+        final int nbThreads = 10;
+        final int nbExchanges = 10;
+        final ReadWriteLock lock = new ReentrantReadWriteLock();
+        final CountDownLatch latch = new CountDownLatch(nbThreads);
+        final AtomicInteger id = new AtomicInteger();
+        lock.writeLock().lock();
+        for (int i = 0; i < nbThreads; i++) {
+            new Thread() {
+                public void run() {
+                    Channel client = null;
+                    try {
+                        client = nmr1.createChannel();
+                        lock.readLock().lock();
+                        for (int i = 0; i < nbExchanges; i++) {
+                            Exchange exchange = 
client.createExchange(Pattern.InOnly);
+                            exchange.getIn().setBody(new StringSource("<hello 
id='" + id.getAndIncrement() + "'/>"));
+                            
exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME,
 PROXY_ENDPOINT_NAME)));
+                            client.sendSync(exchange);
+                            assertEquals(Status.Done, exchange.getStatus());
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    } finally {
+                        lock.readLock().unlock();
+                        latch.countDown();
+                        if (client != null) {
+                            client.close();
+                        }
+                    }
+                }
+            }.start();
+        }
+
+        long t0, t1;
+
+        cluster2.pause();
+
+        t0 = System.currentTimeMillis();
+        lock.writeLock().unlock();
+
+        latch.await();
+
+        Thread.sleep(1000);
+
+        cluster2.resume();
+
+        //Thread.sleep(100);
+
+        broker.stop();
+        Thread.sleep(1000);
+        broker = createBroker();
+
+        latch.await();
+
+        receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT);
+        //Thread.sleep(500);
+        //receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT);
+
+        t1 = System.currentTimeMillis();
+
+        System.err.println("Elapsed time: " + (t1 - t0) + " ms");
+        System.err.println("Throuput: " + (nbThreads * nbExchanges * 1000 / 
(t1 - t0)) + " messages/sec");
+    }
+
+    protected ConnectionFactory createConnectionFactory() {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+        return cf;
+//        XaPooledConnectionFactory cnf = new XaPooledConnectionFactory(cf);
+//        cnf.setTransactionManager(transactionManager);
+//        return cnf;
+    }
+
+    protected Service createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        return broker;
+    }
+
+    protected void createRoute(Transacted transacted,
+                               boolean rollbackOnErrors,
+                               boolean sendFault,
+                               boolean sendError) throws Exception {
+        cluster1 = createCluster(nmr1, "nmr1", transacted, rollbackOnErrors);
+        cluster2 = createCluster(nmr2, "nmr2", transacted, !rollbackOnErrors); 
// the rollbackOnErrors flag should not be used on the JMS consumer side
+        receiver = createReceiver(nmr2, sendFault, sendError);
+        proxy = createProxy(nmr1, cluster1);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        cluster1.destroy();
+        cluster2.destroy();
+        super.tearDown();
+    }
+}

Added: servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties?rev=741598&view=auto
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties 
(added)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties 
Fri Feb  6 15:50:53 2009
@@ -0,0 +1,42 @@
+# 
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed  under the  License is distributed on an "AS IS" BASIS,
+# WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
+# implied.
+#  
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, out
+
+log4j.logger.org.springframework=INFO
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | 
%-16.16t | %-32.32c{1} | %-32.32C %4L | %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true


Reply via email to