Added: servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java?rev=741598&view=auto ============================================================================== --- servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java (added) +++ servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOnlyClusterEndpointTest.java Fri Feb 6 15:50:53 2009 @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicemix.jbi.cluster; + +import org.apache.servicemix.nmr.api.service.ServiceHelper; +import org.apache.servicemix.nmr.api.Endpoint; +import org.apache.servicemix.nmr.api.Channel; +import org.apache.servicemix.nmr.api.Exchange; +import org.apache.servicemix.nmr.api.Pattern; +import org.apache.servicemix.nmr.api.Status; +import org.apache.servicemix.nmr.core.util.StringSource; +import org.apache.servicemix.jbi.cluster.requestor.Transacted; + +public class GenericInOnlyClusterEndpointTest extends AbstractClusterEndpointTest { + + private static final long TIMEOUT = 60 * 1000; + + private ClusterEndpoint cluster1; + private ClusterEndpoint cluster2; + private ReceiverEndpoint receiver; + private ProxyEndpoint proxy; + + public void testInOnlyNoTxNoRb() throws Exception { + createRoute(Transacted.None, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyNoTxNoRbInError() throws Exception { + createRoute(Transacted.None, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyNoTxRb() throws Exception { + createRoute(Transacted.None, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyNoTxRbInError() throws Exception { + createRoute(Transacted.None, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyJmsTxNoRb() throws Exception { + createRoute(Transacted.Jms, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyJmsTxNoRbInError() throws Exception { + createRoute(Transacted.Jms, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyJmsTxRb() throws Exception { + createRoute(Transacted.Jms, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyJmsTxRbInError() throws Exception { + createRoute(Transacted.Jms, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOnlyXaTxNoRb() throws Exception { + createRoute(Transacted.Xa, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyXaTxNoRbInError() throws Exception { + createRoute(Transacted.Xa, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyXaTxRb() throws Exception { + createRoute(Transacted.Xa, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyXaTxRbInError() throws Exception { + createRoute(Transacted.Xa, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOnlyAckTxNoRb() throws Exception { + createRoute(Transacted.ClientAck, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyAckTxNoRbInError() throws Exception { + createRoute(Transacted.ClientAck, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyAckTxRb() throws Exception { + createRoute(Transacted.ClientAck, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOnlyAckTxRbInError() throws Exception { + createRoute(Transacted.ClientAck, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + protected void createRoute(Transacted transacted, + boolean rollbackOnErrors, + boolean sendFault, + boolean sendError) throws Exception { + cluster1 = createCluster(nmr1, "nmr1", transacted, rollbackOnErrors); + cluster2 = createCluster(nmr2, "nmr2", transacted, !rollbackOnErrors); // the rollbackOnErrors flag should not be used on the JMS consumer side + receiver = createReceiver(nmr2, sendFault, sendError); + proxy = createProxy(nmr1, cluster1); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + listener.assertExchangeCompleted(); + cluster1.destroy(); + cluster2.destroy(); + super.tearDown(); + } +}
Added: servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java?rev=741598&view=auto ============================================================================== --- servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java (added) +++ servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/GenericInOutClusterEndpointTest.java Fri Feb 6 15:50:53 2009 @@ -0,0 +1,215 @@ +package org.apache.servicemix.jbi.cluster; + +import org.apache.servicemix.nmr.api.Channel; +import org.apache.servicemix.nmr.api.Exchange; +import org.apache.servicemix.nmr.api.Pattern; +import org.apache.servicemix.nmr.api.Endpoint; +import org.apache.servicemix.nmr.api.Status; +import org.apache.servicemix.nmr.api.service.ServiceHelper; +import org.apache.servicemix.jbi.cluster.requestor.Transacted; +import org.apache.camel.converter.jaxp.StringSource; + +public class GenericInOutClusterEndpointTest extends AbstractClusterEndpointTest { + + private static final long TIMEOUT = 60 * 1000; + + private ClusterEndpoint cluster1; + private ClusterEndpoint cluster2; + private ReceiverEndpoint receiver; + private ProxyEndpoint proxy; + + public void testInOutNoTxNoRb() throws Exception { + createRoute(Transacted.None, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOutNoTxNoRbInError() throws Exception { + createRoute(Transacted.None, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOutNoTxRb() throws Exception { + createRoute(Transacted.None, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOutNoTxRbInError() throws Exception { + createRoute(Transacted.None, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOutJmsTxNoRb() throws Exception { + createRoute(Transacted.Jms, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOutJmsTxNoRbInError() throws Exception { + createRoute(Transacted.Jms, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOutJmsTxRb() throws Exception { + createRoute(Transacted.Jms, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOutJmsTxRbInError() throws Exception { + createRoute(Transacted.Jms, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(3, TIMEOUT); + } + + public void testInOutAckTxNoRb() throws Exception { + createRoute(Transacted.ClientAck, false, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOutAckTxNoRbInError() throws Exception { + createRoute(Transacted.ClientAck, false, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Error, exchange.getStatus()); + client.close(); + receiver.assertExchangesReceived(1, TIMEOUT); + } + + public void testInOutAckTxRb() throws Exception { + createRoute(Transacted.ClientAck, true, false, false); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(2, TIMEOUT); + } + + public void testInOutAckTxRbInError() throws Exception { + createRoute(Transacted.ClientAck, true, false, true); + + Channel client = nmr1.createChannel(); + Exchange exchange = client.createExchange(Pattern.InOut); + exchange.getIn().setBody(new StringSource("<hello/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Active, exchange.getStatus()); + exchange.setStatus(Status.Done); + client.send(exchange); + client.close(); + receiver.assertExchangesReceived(3, TIMEOUT); + } + + protected void createRoute(Transacted transacted, + boolean rollbackOnErrors, + boolean sendFault, + boolean sendError) throws Exception { + cluster1 = createCluster(nmr1, "nmr1", transacted, rollbackOnErrors); + cluster2 = createCluster(nmr2, "nmr2", transacted, !rollbackOnErrors); // the rollbackOnErrors flag should not be used on the JMS consumer side + receiver = createReceiver(nmr2, sendFault, sendError); + proxy = createProxy(nmr1, cluster1); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + listener.assertExchangeCompleted(); + cluster1.destroy(); + cluster2.destroy(); + super.tearDown(); + } +} Added: servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java?rev=741598&view=auto ============================================================================== --- servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java (added) +++ servicemix/smx4/nmr/trunk/jbi/cluster/src/test/java/org/apache/servicemix/jbi/cluster/ReconnectTest.java Fri Feb 6 15:50:53 2009 @@ -0,0 +1,147 @@ +package org.apache.servicemix.jbi.cluster; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.ConnectionFactory; + +import org.apache.activemq.Service; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.pool.XaPooledConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.servicemix.jbi.cluster.requestor.Transacted; +import org.apache.servicemix.jbi.cluster.requestor.AbstractPollingRequestorPool; +import org.apache.servicemix.jbi.cluster.requestor.ActiveMQJmsRequestorPool; +import org.apache.servicemix.nmr.api.Channel; +import org.apache.servicemix.nmr.api.Exchange; +import org.apache.servicemix.nmr.api.Pattern; +import org.apache.servicemix.nmr.api.Endpoint; +import org.apache.servicemix.nmr.api.Status; +import org.apache.servicemix.nmr.api.service.ServiceHelper; +import org.apache.servicemix.nmr.core.util.StringSource; + +/** + * Created by IntelliJ IDEA. + * User: gnodet + * Date: Feb 3, 2009 + * Time: 5:18:32 PM + * To change this template use File | Settings | File Templates. + */ +public class ReconnectTest extends AbstractClusterEndpointTest { + + private static final long TIMEOUT = 10 * 60 * 1000; + + private ClusterEndpoint cluster1; + private ClusterEndpoint cluster2; + private ReceiverEndpoint receiver; + private ProxyEndpoint proxy; + + public void testLoadInOnly() throws Exception { + createRoute(Transacted.Jms, true, false, false); + + final int nbThreads = 10; + final int nbExchanges = 10; + final ReadWriteLock lock = new ReentrantReadWriteLock(); + final CountDownLatch latch = new CountDownLatch(nbThreads); + final AtomicInteger id = new AtomicInteger(); + lock.writeLock().lock(); + for (int i = 0; i < nbThreads; i++) { + new Thread() { + public void run() { + Channel client = null; + try { + client = nmr1.createChannel(); + lock.readLock().lock(); + for (int i = 0; i < nbExchanges; i++) { + Exchange exchange = client.createExchange(Pattern.InOnly); + exchange.getIn().setBody(new StringSource("<hello id='" + id.getAndIncrement() + "'/>")); + exchange.setTarget(nmr1.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, PROXY_ENDPOINT_NAME))); + client.sendSync(exchange); + assertEquals(Status.Done, exchange.getStatus()); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + lock.readLock().unlock(); + latch.countDown(); + if (client != null) { + client.close(); + } + } + } + }.start(); + } + + long t0, t1; + + cluster2.pause(); + + t0 = System.currentTimeMillis(); + lock.writeLock().unlock(); + + latch.await(); + + Thread.sleep(1000); + + cluster2.resume(); + + //Thread.sleep(100); + + broker.stop(); + Thread.sleep(1000); + broker = createBroker(); + + latch.await(); + + receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT); + //Thread.sleep(500); + //receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT); + + t1 = System.currentTimeMillis(); + + System.err.println("Elapsed time: " + (t1 - t0) + " ms"); + System.err.println("Throuput: " + (nbThreads * nbExchanges * 1000 / (t1 - t0)) + " messages/sec"); + } + + protected ConnectionFactory createConnectionFactory() { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + return cf; +// XaPooledConnectionFactory cnf = new XaPooledConnectionFactory(cf); +// cnf.setTransactionManager(transactionManager); +// return cnf; + } + + protected Service createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(true); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + return broker; + } + + protected void createRoute(Transacted transacted, + boolean rollbackOnErrors, + boolean sendFault, + boolean sendError) throws Exception { + cluster1 = createCluster(nmr1, "nmr1", transacted, rollbackOnErrors); + cluster2 = createCluster(nmr2, "nmr2", transacted, !rollbackOnErrors); // the rollbackOnErrors flag should not be used on the JMS consumer side + receiver = createReceiver(nmr2, sendFault, sendError); + proxy = createProxy(nmr1, cluster1); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + listener.assertExchangeCompleted(); + cluster1.destroy(); + cluster2.destroy(); + super.tearDown(); + } +} Added: servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties?rev=741598&view=auto ============================================================================== --- servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties (added) +++ servicemix/smx4/nmr/trunk/jbi/cluster/src/test/resources/log4j.properties Fri Feb 6 15:50:53 2009 @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +# +# The logging properties used during tests.. +# +log4j.rootLogger=INFO, out + +log4j.logger.org.springframework=INFO +log4j.logger.org.apache.activemq=INFO +log4j.logger.org.apache.activemq.spring=WARN +log4j.logger.org.apache.activemq.store.journal=INFO +log4j.logger.org.activeio.journal=INFO + +# CONSOLE appender not used by default +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | %-16.16t | %-32.32c{1} | %-32.32C %4L | %m%n + +# File appender +log4j.appender.out=org.apache.log4j.FileAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.out.file=target/servicemix-test.log +log4j.appender.out.append=true
