Author: bsnyder
Date: Tue Nov 17 06:43:17 2009
New Revision: 881174

URL: http://svn.apache.org/viewvc?rev=881174&view=rev
Log:
Changes to the test for AMQ-2324 and AMQ-2484; trying to get it to pass 
consistently

Modified:
    activemq/trunk/activemq-core/pom.xml
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-core/pom.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=881174&r1=881173&r2=881174&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Nov 17 06:43:17 2009
@@ -213,6 +213,11 @@
       <scope>test</scope>
     </dependency>    
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-jmdns_1.0</artifactId>
       <optional>true</optional>

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=881174&r1=881173&r2=881174&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
 Tue Nov 17 06:43:17 2009
@@ -1,17 +1,25 @@
 package org.apache.activemq.network;
 
+import java.io.File;
+import java.io.IOException;
+
 import javax.jms.DeliveryMode;
 
 import junit.framework.Test;
 
 import org.apache.activemq.broker.StubConnection;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -29,17 +37,67 @@
         NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
         config.setBrokerName("local");
         config.setDispatchAsync(false);
-        bridge = new DemandForwardingBridge(config, createTransport(), 
createRemoteTransport());
+        
+        Transport localTransport = createTransport(); 
+        localTransport.setTransportListener(new TransportListener() {
+               Command command = null;
+                       public void onCommand(Object o) {
+                               this.command = (Command) o;
+                               LOG.info("Command from [" + command.getFrom() + 
"] to [" + command.getTo() + "]");
+                       }
+
+                       public void onException(IOException error) {
+                               LOG.info("Command from [" + command.getFrom() + 
"] to [" + command.getTo() + "]");
+                               LOG.info("Exception: " + error);
+                       }
+
+                       public void transportInterupted() {
+                               LOG.info("Interruption on local transport");
+                       }
+
+                       public void transportResumed() {
+                               LOG.info("Resumption on local transport");
+                       }
+        });
+        
+        Transport remoteTransport = createRemoteTransport();
+        remoteTransport.setTransportListener(new TransportListener() {
+               Command command = null;
+                       public void onCommand(Object o) {
+                               this.command = (Command) o;
+                               LOG.info("Command from [" + command.getFrom() + 
"] to [" + command.getTo() + "]");
+                       }
+
+                       public void onException(IOException error) {
+                               LOG.info("Command from [" + command.getFrom() + 
"] to [" + command.getTo() + "]");
+                               LOG.info("Exception: " + error);
+                       }
+
+                       public void transportInterupted() {
+                               LOG.info("Interruption on remote transport");
+                       }
+
+                       public void transportResumed() {
+                               LOG.info("Resumption on remote transport");
+                       }
+        });
+        
+        bridge = new DemandForwardingBridge(config, localTransport, 
remoteTransport);
         bridge.setBrokerService(broker);
         bridge.start();
         
         // Enable JMX support on the local and remote brokers 
-        broker.setUseJmx(true);
-        remoteBroker.setUseJmx(true);
+//        broker.setUseJmx(true);
+//        remoteBroker.setUseJmx(true);
         
-        // Set the names of teh local and remote brokers 
-        broker.setBrokerName("local");
-        remoteBroker.setBrokerName("remote");
+        // Make sure persistence is disabled 
+        broker.setPersistent(false);
+        broker.setPersistenceAdapter(null);
+        remoteBroker.setPersistent(false);
+        remoteBroker.setPersistenceAdapter(null);
+        
+        // Remove the activemq-data directory from the creation of the remote 
broker
+        FileUtils.deleteDirectory(new File("activemq-data"));
     }
        
        protected void tearDown() throws Exception {
@@ -66,31 +124,42 @@
         
         for (int i = 0; i < sendNumMessages; ++i) {
                destinationInfo1 = createDestinationInfo(connection1, 
connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
-               connection1.send(createMessage(producerInfo, destinationInfo1, 
DeliveryMode.NON_PERSISTENT));
+//             connection1.send(createMessage(producerInfo, destinationInfo1, 
DeliveryMode.NON_PERSISTENT));
+               connection1.request(createMessage(producerInfo, 
destinationInfo1, DeliveryMode.NON_PERSISTENT));
         }
         
         // Ensure that there are 10 messages on the local broker 
-        assertTrue(countMessagesInQueue(connection1, connectionInfo1, 
destinationInfo1) == 10);
+        int messageCount1 = countMessagesInQueue(connection1, connectionInfo1, 
destinationInfo1);
+        assertEquals(10, messageCount1);
         
         
         // Create a consumer on the remote broker 
-        StubConnection connection2 = createRemoteConnection();
+        final StubConnection connection2 = createRemoteConnection();
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         ActiveMQDestination destinationInfo2 = 
                createDestinationInfo(connection2, connectionInfo2, 
ActiveMQDestination.QUEUE_TYPE);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
destinationInfo2);
+        final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
destinationInfo2);
         connection2.send(consumerInfo2);
         
         // Consume 5 of the messages from the remote broker and ack them. 
-        // Because the prefetch size is set to 1000, this will cause the 
-        // messages on the local broker to be forwarded to the remote broker. 
+        // Because the prefetch size is set to 1000 in the 
createConsumerInfo() 
+        // method, this will cause the messages on the local broker to be 
+        // forwarded to the remote broker. 
         for (int i = 0; i < receiveNumMessages; ++i) {
-               Message message1 = receiveMessage(connection2);
-               assertNotNull(message1);
-            connection2.send(createAck(consumerInfo2, message1, 1, 
MessageAck.STANDARD_ACK_TYPE));
+               assertTrue("Message " + i + " was not received", 
Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                               Message message1 = receiveMessage(connection2);
+                               assertNotNull(message1);
+                           connection2.send(createAck(consumerInfo2, message1, 
1, MessageAck.STANDARD_ACK_TYPE));
+                    return message1 != null;
+                }            
+            }));
+//             Message message1 = receiveMessage(connection2);
+//             assertNotNull(message1);
+//          connection2.send(createAck(consumerInfo2, message1, 1, 
MessageAck.STANDARD_ACK_TYPE));
         }
         
         // Close the consumer on the remote broker 
@@ -99,10 +168,13 @@
         // Ensure that there are zero messages on the local broker. This tells 
         // us that those messages have been prefetched to the remote broker 
         // where the demand exists. 
-        assertTrue(countMessagesInQueue(connection1, connectionInfo1, 
destinationInfo1) == 0);
+        int messageCount2 = countMessagesInQueue(connection1, connectionInfo1, 
destinationInfo1);
+// Sometimes it fails here 
+        assertEquals(0, messageCount2);
         
         // There should now be 5 messages stuck on the remote broker 
-        assertTrue(countMessagesInQueue(connection2, connectionInfo2, 
destinationInfo1) == 5);
+        int messageCount3 = countMessagesInQueue(connection2, connectionInfo2, 
destinationInfo2);
+        assertEquals(5, messageCount3);
         
         // Create a consumer on the local broker just to confirm that it 
doesn't 
         // receive any messages  
@@ -113,27 +185,38 @@
                //////////////////////////////////////////////////////
         // An assertNull() is done here because this is currently the correct 
         // behavior. This is actually the purpose of this test - to prove that 
-        // messages are stuck on the remote broker. AMQ-2324 aims to fix this 
-        // situation so that messages don't get stuck. 
+        // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim 
+        // to fix this situation so that messages don't get stuck. 
         assertNull(message1);
                //////////////////////////////////////////////////////
         
-        consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
-        connection2.send(consumerInfo2);
+        ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, 
destinationInfo2);
+        connection2.send(consumerInfo3);
         
         // Consume the last 5 messages from the remote broker and ack them 
just 
         // to clean up the queue. 
+        int counter = 0;
         for (int i = 0; i < receiveNumMessages; ++i) {
                message1 = receiveMessage(connection2);
                assertNotNull(message1);
-            connection2.send(createAck(consumerInfo2, message1, 1, 
MessageAck.STANDARD_ACK_TYPE));
+            connection2.send(createAck(consumerInfo3, message1, 1, 
MessageAck.STANDARD_ACK_TYPE));
+            ++counter;
         }
+        // Ensure that 5 messages were received
+        assertEquals(receiveNumMessages, counter);
         
-        // Close the consumer on the remote broker 
-        connection2.send(consumerInfo2.createRemoveCommand());
+        Thread.sleep(2000);
         
         // Ensure that the queue on the remote broker is empty 
-        assertTrue(countMessagesInQueue(connection2, connectionInfo2, 
destinationInfo2) == 0);
+        int messageCount4 = countMessagesInQueue(connection2, connectionInfo2, 
destinationInfo1);
+// Sometimes it fails here 
+        assertEquals(0, messageCount4);
+        
+        // Close the consumer on the remote broker 
+        connection2.send(consumerInfo3.createRemoveCommand());
+        
+        connection1.stop();
+        connection2.stop();
        }
        
     public static Test suite() {

Modified: activemq/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=881174&r1=881173&r2=881174&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Tue Nov 17 06:43:17 2009
@@ -50,6 +50,7 @@
     <openjpa-version>1.2.0</openjpa-version>
     <commons-dbcp-version>1.2.2</commons-dbcp-version>
     <commons-httpclient-version>3.1</commons-httpclient-version>
+    <commons-io-version>1.4</commons-io-version>
     <commons-logging-version>1.1</commons-logging-version>
     <commons-pool-version>1.4</commons-pool-version>
     <commons-primitives-version>1.0</commons-primitives-version>
@@ -815,6 +816,12 @@
       </dependency>
 
       <dependency>
+        <groupId>commons-io</groupId>
+        <artifactId>commons-io</artifactId>
+        <version>${commons-io-version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.ant</groupId>
         <artifactId>ant</artifactId>
         <version>${ant-version}</version>


Reply via email to