Author: chirino
Date: Wed Jul 19 22:37:29 2006
New Revision: 423780

URL: http://svn.apache.org/viewvc?rev=423780&view=rev
Log:
Adding some network reconnect tests.  These are used to validate the our 
network connections get re-established after a broker restart.

Added:
    
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
    
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
    
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
    
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
    
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
    
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml

Added: 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java?rev=423780&view=auto
==============================================================================
--- 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
 (added)
+++ 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
 Wed Jul 19 22:37:29 2006
@@ -0,0 +1,314 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * These test cases are used to verifiy that network connections get re 
established in all broker
+ * restart scenarios.
+ *  
+ * @author chirino
+ */
+public class NetworkReconnectTest extends TestCase {
+
+       private BrokerService producerBroker;
+       private BrokerService consumerBroker;
+       private ActiveMQConnectionFactory producerConnectionFactory;
+       private ActiveMQConnectionFactory consumerConnectionFactory;
+       private Destination destination;
+       private ArrayList connections = new ArrayList();
+       
+       public void testMultipleProducerBrokerRestarts() throws Exception {
+               for (int i = 0; i < 10; i++) {
+                       testWithProducerBrokerRestart();
+                       disposeConsumerConnections();
+               }
+       }
+       
+       public void testWithoutRestarts() throws Exception {
+               startProducerBroker();
+               startConsumerBroker();
+
+               MessageConsumer consumer = createConsumer();
+               AtomicInteger counter = 
createConsumerCounter(producerConnectionFactory);
+               waitForConsumerToArrive(counter);
+               
+               String messageId = sendMessage();
+               Message message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());
+               
+               assertNull( consumer.receiveNoWait() );
+               
+       }
+
+       public void testWithProducerBrokerRestart() throws Exception {
+               startProducerBroker();
+               startConsumerBroker();
+
+               MessageConsumer consumer = createConsumer();
+               AtomicInteger counter = 
createConsumerCounter(producerConnectionFactory);
+               waitForConsumerToArrive(counter);
+               
+               String messageId = sendMessage();
+               Message message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());             
+               assertNull( consumer.receiveNoWait() );
+               
+               // Restart the first broker...
+               stopProducerBroker();
+               startProducerBroker();
+               
+               counter = createConsumerCounter(producerConnectionFactory);
+               waitForConsumerToArrive(counter);
+               
+               messageId = sendMessage();
+               message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());             
+               assertNull( consumer.receiveNoWait() );
+               
+       }
+
+       public void testWithConsumerBrokerRestart() throws Exception {
+
+               startProducerBroker();
+               startConsumerBroker();
+
+               MessageConsumer consumer = createConsumer();
+               AtomicInteger counter = 
createConsumerCounter(producerConnectionFactory);
+               waitForConsumerToArrive(counter);
+               
+               String messageId = sendMessage();
+               Message message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());             
+               assertNull( consumer.receiveNoWait() );
+               
+               // Restart the first broker...
+               stopConsumerBroker();           
+               waitForConsumerToLeave(counter);                
+               startConsumerBroker();
+               
+               consumer = createConsumer();
+               waitForConsumerToArrive(counter);
+               
+               messageId = sendMessage();
+               message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());             
+               assertNull( consumer.receiveNoWait() );
+               
+       }
+       
+       public void testWithConsumerBrokerStartDelay() throws Exception {
+               
+               startConsumerBroker();
+               MessageConsumer consumer = createConsumer();
+               
+               Thread.sleep(1000*5);
+               
+               startProducerBroker();
+               AtomicInteger counter = 
createConsumerCounter(producerConnectionFactory);
+               waitForConsumerToArrive(counter);
+               
+               String messageId = sendMessage();
+               Message message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());
+               
+               assertNull( consumer.receiveNoWait() );
+
+       }
+
+       
+       public void testWithProducerBrokerStartDelay() throws Exception {
+               
+               startProducerBroker();
+               AtomicInteger counter = 
createConsumerCounter(producerConnectionFactory);
+
+               Thread.sleep(1000*5);
+               
+               startConsumerBroker();
+               MessageConsumer consumer = createConsumer();
+                               
+               waitForConsumerToArrive(counter);
+               
+               String messageId = sendMessage();
+               Message message = consumer.receive(1000);
+               
+               assertEquals(messageId, message.getJMSMessageID());
+               
+               assertNull( consumer.receiveNoWait() );
+
+       }
+
+       protected void setUp() throws Exception {
+               producerConnectionFactory = createProducerConnectionFactory();
+               consumerConnectionFactory = createConsumerConnectionFactory();
+               destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
+               
+       }
+       
+       protected void tearDown() throws Exception {
+               disposeConsumerConnections();
+               try {
+                       stopProducerBroker();
+               } catch (Throwable e) {
+               }
+               try {
+                       stopConsumerBroker();
+               } catch (Throwable e) {
+               }
+       }
+       
+       protected void disposeConsumerConnections() {
+               for (Iterator iter = connections.iterator(); iter.hasNext();) {
+                       Connection connection = (Connection) iter.next();
+                       try { connection.close(); } catch (Throwable ignore) {}
+               }
+       }
+       
+       protected void startProducerBroker() throws Exception {
+               if( producerBroker==null ) {
+                       producerBroker = createFirstBroker();
+                       producerBroker.start();
+               }
+       }
+       
+       protected void stopProducerBroker() throws Exception {
+               if( producerBroker!=null ) {
+                       producerBroker.stop();
+                       producerBroker=null;
+               }
+       }
+       
+       protected void startConsumerBroker() throws Exception {
+               if( consumerBroker==null ) {
+                       consumerBroker = createSecondBroker();
+                       consumerBroker.start();
+               }
+       }
+       
+       protected void stopConsumerBroker() throws Exception {
+               if( consumerBroker!=null ) {
+                       consumerBroker.stop();
+                       consumerBroker=null;
+               }
+       }
+       
+       protected BrokerService createFirstBroker() throws Exception {
+               return BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
+       }
+       
+       protected BrokerService createSecondBroker() throws Exception {
+               return BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
+       }
+
+       protected ActiveMQConnectionFactory createProducerConnectionFactory() {
+               return new ActiveMQConnectionFactory("vm://broker1");
+       }
+       
+       protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
+               return new ActiveMQConnectionFactory("vm://broker2");
+       }
+       
+       protected String sendMessage() throws JMSException {
+               Connection connection = null;
+               try {
+                       connection = 
producerConnectionFactory.createConnection();
+                       Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                       MessageProducer producer = 
session.createProducer(destination);
+                       Message message = session.createMessage();
+                       producer.send(message);
+                       return message.getJMSMessageID();
+               } finally {
+                       try { connection.close(); } catch (Throwable ignore) {}
+               }
+       }
+       
+       protected MessageConsumer createConsumer() throws JMSException {
+               Connection connection = 
consumerConnectionFactory.createConnection();
+               connections.add(connection);
+               connection.start();
+               
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               return  session.createConsumer(destination);
+       }
+       
+       protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory 
cf) throws Exception {
+               final AtomicInteger rc = new AtomicInteger(0);
+               Connection connection  = cf.createConnection();
+               connections.add(connection);
+               connection.start();
+               
+               ConsumerEventSource source = new 
ConsumerEventSource(connection, destination);
+               source.setConsumerListener(new ConsumerListener(){
+                       public void onConsumerEvent(ConsumerEvent event) {
+                               rc.set(event.getConsumerCount());
+                       }
+               });
+               source.start();
+               
+               return rc;
+       }
+       
+       protected void waitForConsumerToArrive(AtomicInteger consumerCounter) 
throws InterruptedException {
+               for( int i=0; i < 100; i++ ) {
+                       if( consumerCounter.get() > 0 ) {
+                               return;
+                       }
+                       Thread.sleep(50);
+               }
+               fail("The consumer did not arrive.");
+       }
+       
+       protected void waitForConsumerToLeave(AtomicInteger consumerCounter) 
throws InterruptedException {
+               for( int i=0; i < 100; i++ ) {
+                       if( consumerCounter.get() == 0 ) {
+                               return;
+                       }
+                       Thread.sleep(50);
+               }
+               fail("The consumer did not leave.");
+       }
+
+}

Added: 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java?rev=423780&view=auto
==============================================================================
--- 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
 (added)
+++ 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
 Wed Jul 19 22:37:29 2006
@@ -0,0 +1,90 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+/**
+ * Test network reconnects over SSH tunnels.  This case can be especially 
tricky since the SSH tunnels
+ * fool the TCP transport into thinking that they are initially connected.
+ *  
+ * @author chirino
+ */
+public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest {
+
+       ArrayList processes = new ArrayList();
+       
+       
+       protected BrokerService createFirstBroker() throws Exception {
+               return BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml"));
+       }
+       
+       protected BrokerService createSecondBroker() throws Exception {
+               return BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml"));
+       }
+       
+       protected void setUp() throws Exception {               
+               startProcess("ssh -Nn -L60006:localhost:61616 localhost");
+               startProcess("ssh -Nn -L60007:localhost:61617 localhost");      
        
+               super.setUp();
+       }
+       
+       protected void tearDown() throws Exception {            
+               super.tearDown();
+               for (Iterator iter = processes.iterator(); iter.hasNext();) {
+                       Process p = (Process) iter.next();
+                       p.destroy();
+               }
+       }
+
+       private void startProcess(String command) throws IOException {
+               final Process process = Runtime.getRuntime().exec(command);
+               processes.add(process);
+               new Thread("stdout: "+command){
+                       public void run() {
+                               try {
+                                       InputStream is = 
process.getInputStream();
+                                       int c;
+                                       while((c=is.read())>=0) {
+                                               System.out.write(c);
+                                       }
+                               } catch (IOException e) {
+                               }
+                       }
+               }.start();
+               new Thread("stderr: "+command){
+                       public void run() {
+                               try {
+                                       InputStream is = 
process.getErrorStream();
+                                       int c;
+                                       while((c=is.read())>=0) {
+                                               System.err.write(c);
+                                       }
+                               } catch (IOException e) {
+                               }
+                       }
+               }.start();
+       }
+}

Added: 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml?rev=423780&view=auto
==============================================================================
--- 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
 (added)
+++ 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
 Wed Jul 19 22:37:29 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0";>
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker1" persistent="false" useShutdownHook="false" 
useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+      <transportConnector uri="vm://broker1"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61617)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+</beans>
+

Added: 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml?rev=423780&view=auto
==============================================================================
--- 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
 (added)
+++ 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
 Wed Jul 19 22:37:29 2006
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0";>
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker2" persistent="false" useShutdownHook="false" 
useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61617"/>
+      <transportConnector uri="vm://broker2"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61616)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+
+</beans>
+

Added: 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml?rev=423780&view=auto
==============================================================================
--- 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
 (added)
+++ 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
 Wed Jul 19 22:37:29 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0";>
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker1" persistent="false" useShutdownHook="false" 
useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+      <transportConnector uri="vm://broker1"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:60007)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+</beans>
+

Added: 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml?rev=423780&view=auto
==============================================================================
--- 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml
 (added)
+++ 
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml
 Wed Jul 19 22:37:29 2006
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0";>
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker2" persistent="false" useShutdownHook="false" 
useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61617"/>
+      <transportConnector uri="vm://broker2"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:60006)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+
+</beans>
+


Reply via email to