Author: tabish
Date: Wed Jun 20 13:08:10 2012
New Revision: 1352081

URL: http://svn.apache.org/viewvc?rev=1352081&view=rev
Log:
Don't use fixed ports for broker instances, let the connector find an open 
port. 

Modified:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java?rev=1352081&r1=1352080&r2=1352081&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
 Wed Jun 20 13:08:10 2012
@@ -17,14 +17,14 @@
 
 package org.apache.activemq.bugs;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -34,952 +34,701 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertTrue;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+public class AMQ3274Test {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(AMQ3274Test.class);
 
-/**
- *
- */
-public class AMQ3274Test
-{
-       protected static int            Next_broker_num = 0;
-       protected EmbeddedTcpBroker     broker1;
-       protected EmbeddedTcpBroker     broker2;
-
-       protected int                   nextEchoId = 0;
-       protected boolean               testError = false;
-
-       protected int                   echoResponseFill = 0;   // Number of 
"filler" response messages per request
-
-       protected static Log            LOG;
-
-       static
-       {
-               LOG = LogFactory.getLog(AMQ3274Test.class);
-       }
-
-       public AMQ3274Test ()
-       throws Exception
-       {
-               broker1 = new EmbeddedTcpBroker();
-               broker2 = new EmbeddedTcpBroker();
-
-               broker1.coreConnectTo(broker2, true);
-               broker2.coreConnectTo(broker1, true);
-       }
-
-       public void logMessage (String msg)
-       {
-               System.out.println(msg);
-               System.out.flush();
-       }
-
-
-       /**
-        *
-        */
-
-       public void testMessages (Session sess, MessageProducer req_prod, 
Destination resp_dest, int num_msg)
-       throws Exception
-       {
-               MessageConsumer resp_cons;
-               TextMessage             msg;
-               MessageClient   cons_client;
-               int                             cur;
-               int                             tot_expected;
-
-               resp_cons = sess.createConsumer(resp_dest);
-
-               cons_client = new MessageClient(resp_cons, num_msg);
-               cons_client.start();
-
-               cur = 0;
-               while ( ( cur < num_msg ) && ( ! testError ) )
-               {
-                       msg = sess.createTextMessage("MSG AAAA " + cur);
-                       msg.setIntProperty("SEQ", 100 + cur);
-                       msg.setStringProperty("TEST", "TOPO");
-                       msg.setJMSReplyTo(resp_dest);
-
-                       if ( cur == ( num_msg - 1 ) )
-                               msg.setBooleanProperty("end-of-response", true);
-
-                       req_prod.send(msg);
-
-                       cur++;
-               }
-
-                       //
-                       // Give the consumer some time to receive the response.
-                       //
-               cons_client.waitShutdown(5000);
-
-                       //
-                       // Now shutdown the consumer if it's still running.
-                       //
-               if ( cons_client.shutdown() )
-                       LOG.debug("Consumer client shutdown complete");
-               else
-                       LOG.debug("Consumer client shutdown incomplete!!!");
-
-
-                       //
-                       // Check that the correct number of messages was 
received.
-                       //
-               tot_expected = num_msg * ( echoResponseFill + 1 );
-
-               if ( cons_client.getNumMsgReceived() == tot_expected )
-               {
-                       LOG.info("Have " + tot_expected + " messages, 
as-expected");
-               }
-               else
-               {
-                       testError = true;
-                       LOG.info("Have " + cons_client.getNumMsgReceived() + " 
messages; expected " + tot_expected);
-               }
-
-               resp_cons.close();
-       }
-
-
-       /**
-        * Test one destination between the given "producer broker" and 
"consumer broker" specified.
-        */
-       public void testOneDest (Connection conn, Session sess, Destination 
cons_dest, String prod_broker_url,
-                                String cons_broker_url, int num_msg)
-       throws Exception
-       {
-               int                     echo_id;
-
-               EchoService             echo_svc;
-               String                  echo_queue_name;
-               Destination             prod_dest;
-               MessageProducer         msg_prod;
-
-               synchronized ( this )
-               {
-                       echo_id = this.nextEchoId;
-                       this.nextEchoId++;
-               }
-
-               echo_queue_name = "echo.queue." + echo_id;
-
-                       //
-                       // Remove any previously-created echo queue with the 
same name.
-                       //
-               LOG.trace("destroying the echo queue in case an old one 
exists");
-               removeQueue(conn, echo_queue_name);
-
-
-                       //
-                       // Now start the echo service with that queue.
-                       //
-               echo_svc = new EchoService(echo_queue_name, prod_broker_url);
-               echo_svc.start();
-
-
-                       //
-                       // Create the Producer to the echo request Queue
-                       //
-               LOG.trace("Creating echo queue and producer");
-               prod_dest = sess.createQueue(echo_queue_name);
-               msg_prod = sess.createProducer(prod_dest);
-
-
-                       //
-                       // Pass messages around.
-                       //
-               testMessages(sess, msg_prod, cons_dest, num_msg);
-
-
-               //
-               //
-               //
-
-               echo_svc.shutdown();
-               msg_prod.close();
-       }
-
-
-       /**
-        * TEST TEMPORARY TOPICS
-        */
-       public void testTempTopic (String prod_broker_url, String 
cons_broker_url)
-       throws Exception
-       {
-               Connection              conn;
-               Session                 sess;
-               Destination             cons_dest;
-               int                     echo_id;
-               int                     num_msg;
-
-               num_msg = 5;
-
-               LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg +
-                        " messages)");
-
-
-                       //
-                       // Connect to the bus.
-                       //
-
-               conn = createConnection(cons_broker_url);
-               conn.start();
-               sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-                       //
-                       // Create the destination on which messages are being 
tested.
-                       //
-
-               LOG.trace("Creating destination");
-               cons_dest = sess.createTemporaryTopic();
-
-               testOneDest(conn, sess, cons_dest, prod_broker_url, 
cons_broker_url, num_msg);
-
-
-                       //
-                       // Cleanup
-                       //
-
-               sess.close();
-               conn.close();
-       }
-
-
-       /**
-        * TEST TOPICS
-        */
-       public void testTopic (String prod_broker_url, String cons_broker_url)
-       throws Exception
-       {
-               int                             num_msg;
-
-               Connection              conn;
-               Session                 sess;
-               String                  topic_name;
-
-               Destination             cons_dest;
-
-               num_msg = 5;
-
-               LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg +
-                        " messages)");
-
-
-                       //
-                       // Connect to the bus.
-                       //
-
-               conn = createConnection(cons_broker_url);
-               conn.start();
-               sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-                       //
-                       // Create the destination on which messages are being 
tested.
-                       //
-
-               topic_name = "topotest2.perm.topic";
-               LOG.trace("Removing existing Topic");
-               removeTopic(conn, topic_name);
-               LOG.trace("Creating Topic, " + topic_name);
-               cons_dest = sess.createTopic(topic_name);
-
-               testOneDest(conn, sess, cons_dest, prod_broker_url, 
cons_broker_url, num_msg);
-
-
-                       //
-                       // Cleanup
-                       //
-
-               removeTopic(conn, topic_name);
-               sess.close();
-               conn.close();
-       }
-
-
-       /**
-        * TEST TEMPORARY QUEUES
-        */
-       public void testTempQueue (String prod_broker_url, String 
cons_broker_url)
-       throws Exception
-       {
-               int             echo_id;
-               int             num_msg;
-
-               Connection      conn;
-               Session         sess;
-
-               Destination     cons_dest;
-
-               num_msg = 5;
-
-               LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg +
-                        " messages)");
-
-
-                       //
-                       // Connect to the bus.
-                       //
-
-               conn = createConnection(cons_broker_url);
-               conn.start();
-               sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-                       //
-                       // Create the destination on which messages are being 
tested.
-                       //
-
-               LOG.trace("Creating destination");
-               cons_dest = sess.createTemporaryQueue();
-
-               testOneDest(conn, sess, cons_dest, prod_broker_url, 
cons_broker_url, num_msg);
-
-
-                       //
-                       // Cleanup
-                       //
-
-               sess.close();
-               conn.close();
-       }
-
-
-       /**
-        * TEST QUEUES
-        */
-       public void testQueue (String prod_broker_url, String cons_broker_url)
-       throws Exception
-       {
-               int                             num_msg;
-
-               Connection              conn;
-               Session                 sess;
-               String                  queue_name;
-
-               Destination             cons_dest;
-
-               num_msg = 5;
-
-               LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg +
-                        " messages)");
-
-
-                       //
-                       // Connect to the bus.
-                       //
-
-               conn = createConnection(cons_broker_url);
-               conn.start();
-               sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-                       //
-                       // Create the destination on which messages are being 
tested.
-                       //
-
-               queue_name = "topotest2.perm.queue";
-               LOG.trace("Removing existing Queue");
-               removeQueue(conn, queue_name);
-               LOG.trace("Creating Queue, " + queue_name);
-               cons_dest = sess.createQueue(queue_name);
-
-               testOneDest(conn, sess, cons_dest, prod_broker_url, 
cons_broker_url, num_msg);
-
-
-                       //
-                       // Cleanup
-                       //
-
-               removeQueue(conn, queue_name);
-               sess.close();
-               conn.close();
-       }
-
-       @Test
-       public void run ()
-       throws Exception
-       {
-               Thread  start1;
-               Thread  start2;
-
-               testError = false;
-
-                       // Use threads to avoid startup deadlock since the 
first broker started waits until
-                       //      it knows the name of the remote broker before 
finishing its startup, which means
-                       //      the remote must already be running.
-
-               start1 = new Thread() {
-                       public void run()
-                       {
-                               try {
-                                       broker1.start();
-                               } catch (Exception ex) {
-                                       LOG.error(null, ex);
-                               }
-                       }
-               };
-
-               start2 = new Thread() {
-                       public void run()
-                       {
-                               try {
-                                       broker2.start();
-                               } catch (Exception ex) {
-                                       LOG.error(null, ex);
-                               }
-                       }
-               };
-
-               start1.start();
-               start2.start();
-
-               start1.join();
-               start2.join();
-
-               if ( ! testError )
-                       this.testTempTopic(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
-
-               if ( ! testError )
-                       this.testTempQueue(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
-
-               if ( ! testError )
-                       this.testTopic(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
-
-               if ( ! testError )
-                       this.testQueue(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
-
-               Thread.sleep(100);
-
-               shutdown();
-
-               assertTrue(! testError);
-       }
-
-       public void shutdown ()
-       throws Exception
-       {
-               broker1.stop();
-               broker2.stop();
-       }
-
-       /**
-        * @param args the command line arguments
-        */
-       public static void main(String[] args)
-       {
-               AMQ3274Test    main_obj;
-
-               try
-               {
-                       main_obj = new AMQ3274Test();
-                       main_obj.run();
-               }
-               catch (Exception ex)
-               {
-                       ex.printStackTrace();
-                       
-                       LOG.error(null, ex);
-
-                       System.exit(0);
-               }
-       }
-
-       protected Connection    createConnection (String url)
-       throws Exception
-       {
-               return  
org.apache.activemq.ActiveMQConnection.makeConnection(url);
-       }
-
-       protected static void   removeQueue (Connection conn, String dest_name)
-       throws java.lang.Exception
-       {
-               org.apache.activemq.command.ActiveMQDestination         dest;
-
-               if ( conn instanceof org.apache.activemq.ActiveMQConnection )
-               {
-                       dest = org.apache.activemq.command.ActiveMQDestination.
-                       createDestination(dest_name, (byte) 
org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
-                       
((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
-               }
-       }
-
-       protected static void   removeTopic (Connection conn, String dest_name)
-       throws java.lang.Exception
-       {
-               org.apache.activemq.command.ActiveMQDestination         dest;
-
-               if ( conn instanceof org.apache.activemq.ActiveMQConnection )
-               {
-                       dest = org.apache.activemq.command.ActiveMQDestination.
-                       createDestination(dest_name, (byte) 
org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
-                       
((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
-               }
-       }
-
-       public static String fmtMsgInfo (Message msg)
-       throws Exception
-       {
-               StringBuilder           msg_desc;
-               String                  prop;
-               Enumeration             prop_enum;
-
-               msg_desc = new StringBuilder();
-               msg_desc = new StringBuilder();
-
-               if ( msg instanceof TextMessage )
-               {
-                       msg_desc.append(((TextMessage) msg).getText());
-               }
-               else
-               {
-                       msg_desc.append("[");
-                       msg_desc.append(msg.getClass().getName());
-                       msg_desc.append("]");
-               }
-
-               prop_enum = msg.getPropertyNames();
-               while ( prop_enum.hasMoreElements() )
-               {
-                       prop = (String) prop_enum.nextElement();
-                       msg_desc.append("; ");
-                       msg_desc.append(prop);
-                       msg_desc.append("=");
-                       msg_desc.append(msg.getStringProperty(prop));
-               }
-
-               return  msg_desc.toString();
-       }
-
-//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-/////////////////////////////////////////////////  INTERNAL CLASSES  
/////////////////////////////////////////////////
-//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-       protected class EmbeddedTcpBroker
-       {
-               protected BrokerService         brokerSvc;
-               protected int                   brokerNum;
-               protected String                brokerName;
-               protected String                brokerId;
-               protected int                   port;
-               protected String                tcpUrl;
-
-               public EmbeddedTcpBroker ()
-               throws Exception
-               {
-                       brokerSvc = new BrokerService();
-
-                       synchronized ( this.getClass() )
-                       {
-                               brokerNum = Next_broker_num;
-                               Next_broker_num++;
-                       }
-
-                       brokerName = "broker" + brokerNum;
-                       brokerId = "b" + brokerNum;
-
-                       brokerSvc.setBrokerName(brokerName);
-                       brokerSvc.setBrokerId(brokerId);
-
-                       brokerSvc.setPersistent(false);
-                       brokerSvc.setUseJmx(false); // TBD
-
-                       port = 60000 + ( brokerNum * 10 );
-
-                               // Configure the transport connector (TCP)
-                       tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
-                       brokerSvc.addConnector(tcpUrl);
-               }
-
-               public Connection       createConnection ()
-               throws URISyntaxException, JMSException
-               {
-                       Connection      result;
-
-                       result = 
org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
-
-                       return  result;
-               }
-
-               public String   getConnectionUrl ()
-               {
-                       return  this.tcpUrl;
-               }
-
-
-               /**
-                * Create network connections to the given broker using the 
network-connector
-                * configuration of CORE brokers (e.g. 
core1.bus.dev1.coresys.tmcs)
-                *
-                * @param other
-                * @param duplex_f
-                */
-               public void coreConnectTo (EmbeddedTcpBroker other, boolean 
duplex_f)
-               throws Exception
-               {
-                       this.makeConnectionTo(other, duplex_f, true);
-                       this.makeConnectionTo(other, duplex_f, false);
-               }
-
-               public void start ()
-               throws Exception
-               {
-                       brokerSvc.start();
-                       //brokerSvc.waitUntilStarted();
-               }
-
-               public void stop ()
-               throws Exception
-               {
-                       brokerSvc.stop();
-               }
-
-
-               /**
-                * Make one connection to the other embedded broker, of the 
specified type (queue or topic)
-                * using the standard CORE broker networking.
-                * 
-                * @param other
-                * @param duplex_f
-                * @param queue_f
-                * @throws Exception
-                */
-               protected void  makeConnectionTo (EmbeddedTcpBroker other, 
boolean duplex_f, boolean queue_f)
-               throws Exception
-               {
-                       NetworkConnector        nw_conn;
-                       String                          prefix;
-                       ActiveMQDestination excl_dest;
-                       ArrayList                       excludes;
-
-                       nw_conn = new DiscoveryNetworkConnector(new 
URI("static:(" + other.tcpUrl + ")"));
-                       nw_conn.setDuplex(duplex_f);
-
-                       if ( queue_f )
-                               nw_conn.setConduitSubscriptions(false);
-                       else
-                               nw_conn.setConduitSubscriptions(true);
-
-                       nw_conn.setNetworkTTL(5);
-                       nw_conn.setSuppressDuplicateQueueSubscriptions(true);
-                       nw_conn.setDecreaseNetworkConsumerPriority(true);
-                       nw_conn.setBridgeTempDestinations(true);
-
-                       if ( queue_f )
-                       {
-                               prefix = "queue";
-                               excl_dest = 
ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
-                       }
-                       else
-                       {
-                               prefix = "topic";
-                               excl_dest = 
ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
-                       }
-
-                       excludes = new ArrayList();
-                       excludes.add(excl_dest);
-                       nw_conn.setExcludedDestinations(excludes);
-
-                       if ( duplex_f )
-                               nw_conn.setName(this.brokerId + "<-" + prefix + 
"->" + other.brokerId);
-                       else
-                               nw_conn.setName(this.brokerId + "-" + prefix + 
"->" + other.brokerId);
-
-                       brokerSvc.addNetworkConnector(nw_conn);
-               }
-       }
-
-       protected class MessageClient extends java.lang.Thread
-       {
-               protected MessageConsumer       msgCons;
-               protected boolean               shutdownInd;
-               protected int                   expectedCount;
-               protected int                   lastSeq = 0;
-               protected int                   msgCount = 0;
-               protected boolean               haveFirstSeq;
-               protected CountDownLatch        shutdownLatch;
-
-               public MessageClient (MessageConsumer cons, int num_to_expect)
-               {
-                       msgCons = cons;
-                       expectedCount = ( num_to_expect * ( echoResponseFill + 
1 ) );
-                       shutdownLatch = new CountDownLatch(1);
-               }
-
-               public void run ()
-               {
-                       CountDownLatch  latch;
-
-                       try
-                       {
-                               synchronized ( this )
-                               {
-                                       latch = shutdownLatch;
-                               }
-
-                               shutdownInd = false;
-                               processMessages();
-
-                               latch.countDown();
-                       }
-                       catch ( Exception exc )
-                       {
-                               LOG.error("message client error", exc);
-                       }
-               }
-
-               public void waitShutdown (long timeout)
-               {
-                       CountDownLatch  latch;
-
-                       try
-                       {
-                               synchronized ( this )
-                               {
-                                       latch = shutdownLatch;
-                               }
-
-                               if ( latch != null )
-                                       latch.await(timeout, 
TimeUnit.MILLISECONDS);
-                               else
-                                       LOG.info("echo client shutdown: client 
does not appear to be active");
-                       }
-                       catch ( InterruptedException int_exc )
-                       {
-                               LOG.warn("wait for message client shutdown 
interrupted", int_exc);
-                       }
-               }
-
-               public boolean shutdown ()
-               {
-                       boolean down_ind;
-
-                       if ( ! shutdownInd )
-                       {
-                               shutdownInd = true;
-                       }
-
-                       waitShutdown(200);
-
-                       synchronized ( this )
-                       {
-                               if ( ( shutdownLatch == null ) || ( 
shutdownLatch.getCount() == 0 ) )
-                                       down_ind = true;
-                               else
-                                       down_ind = false;
-                       }
-
-                       return  down_ind;
-               }
-
-               public int      getNumMsgReceived ()
-               {
-                       return  msgCount;
-               }
-
-               protected void processMessages ()
-               throws Exception
-               {
-                       Message in_msg;
-
-                       haveFirstSeq = false;
-
-                               //
-                               // Stop at shutdown time or after any test 
error is detected.
-                               //
-
-                       while ( ( ! shutdownInd ) && ( ! testError ) )
-                       {
-                               in_msg = msgCons.receive(100);
-
-                               if ( in_msg != null )
-                               {
-                                       msgCount++;
-                                       checkMessage(in_msg);
-                               }
-                       }
-               }
-
-               protected void  checkMessage (Message in_msg)
-               throws Exception
-               {
-                       int                             seq;
-
-                       LOG.debug("received message " + fmtMsgInfo(in_msg));
-
-                               //
-                               // Only check messages with a sequence number.
-                               //
-
-                       if ( in_msg.propertyExists("SEQ") )
-                       {
-                               seq = in_msg.getIntProperty("SEQ");
-
-                               if ( ( haveFirstSeq ) && ( seq != ( lastSeq + 1 
) ) )
-                               {
-                                       LOG.error("***ERROR*** incorrect 
sequence number; expected " +
-                                                 Integer.toString(lastSeq + 1) 
+ " but have " +
-                                                 Integer.toString(seq));
-
-                                       testError = true;
-                               }
-
-                               lastSeq = seq;
-
-                               if ( msgCount > expectedCount )
-                               {
-                                       LOG.warn("*** have more messages than 
expected; have "  + msgCount +
-                                                "; expect " + expectedCount);
-
-                                       testError = true;
-                               }
-                       }
-
-                       if ( in_msg.propertyExists("end-of-response") )
-                       {
-                               LOG.trace("received end-of-response message");
-                               shutdownInd = true;
-                       }
-               }
-       }
-
-       /**
-        *
-        */
-       protected class EchoService extends java.lang.Thread
-       {
-               protected String                destName;
-               protected Connection            jmsConn;
-               protected Session               sess;
-               protected MessageConsumer       msg_cons;
-               protected boolean               Shutdown_ind;
-
-               protected Destination           req_dest;
-               protected Destination           resp_dest;
-               protected MessageProducer       msg_prod;
-
-               protected CountDownLatch        waitShutdown;
-
-               public EchoService (String dest, Connection broker_conn)
-               throws Exception
-               {
-                       destName = dest;
-                       jmsConn = broker_conn;
-
-                       Shutdown_ind = false;
-
-                       sess = jmsConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-                       req_dest = sess.createQueue(destName);
-                       msg_cons = sess.createConsumer(req_dest);
-
-                       jmsConn.start();
-
-                       waitShutdown = new CountDownLatch(1);
-               }
-
-               public EchoService (String dest, String broker_url)
-               throws Exception
-               {
-                       this(dest, 
ActiveMQConnection.makeConnection(broker_url));
-               }
-
-               public void run ()
-               {
-                       Message req;
-
-                       try
-                       {
-                               LOG.info("STARTING ECHO SERVICE");
-
-                               while ( ! Shutdown_ind )
-                               {
-                                       req = msg_cons.receive(100);
-                                       if ( req != null )
-                                       {
-                                               if ( LOG.isDebugEnabled() )
-                                                       LOG.debug("ECHO request 
message " + req.toString());
-                                               
-                                               resp_dest = req.getJMSReplyTo();
-                                               if ( resp_dest != null )
-                                               {
-                                                       msg_prod = 
sess.createProducer(resp_dest);
-                                                       msg_prod.send(req);
-                                                       msg_prod.close();
-                                                       msg_prod = null;
-                                               }
-                                               else
-                                               {
-                                                       LOG.warn("invalid 
request: no reply-to destination given");
-                                               }
-                                       }
-                               }
-                       }
-                       catch (Exception ex)
-                       {
-                               LOG.error(null, ex);
-                       }
-                       finally
-                       {
-                               LOG.info("shutting down test echo service");
-
-                               try
-                               {
-                                       jmsConn.stop();
-                               }
-                               catch ( javax.jms.JMSException jms_exc )
-                               {
-                                       LOG.warn("error on shutting down JMS 
connection", jms_exc);
-                               }
-
-                               synchronized ( this )
-                               {
-                                       waitShutdown.countDown();
-                               }
-                       }
-               }
-
-
-               /**
-                * Shut down the service, waiting up to 3 seconds for the 
service to terminate.
-                */
-               public void shutdown ()
-               {
-                       CountDownLatch  wait_l;
-
-                       synchronized ( this )
-                       {
-                               wait_l = waitShutdown;
-                       }
-
-                       Shutdown_ind = true;
-                       
-                       try
-                       {
-                               if ( wait_l != null )
-                               {
-                                       if ( wait_l.await(3000, 
TimeUnit.MILLISECONDS) )
-                                               LOG.info("echo service shutdown 
complete");
-                                       else
-                                               LOG.warn("timeout waiting for 
echo service shutdown");
-                               }
-                               else
-                               {
-                                       LOG.info("echo service shutdown: 
service does not appear to be active");
-                               }
-                       }
-                       catch ( InterruptedException int_exc )
-                       {
-                               LOG.warn("interrupted while waiting for echo 
service shutdown");
-                       }
-               }
-       }
+    protected static int Next_broker_num = 0;
+    protected EmbeddedTcpBroker broker1;
+    protected EmbeddedTcpBroker broker2;
+
+    protected int nextEchoId = 0;
+    protected boolean testError = false;
+
+    protected int echoResponseFill = 0; // Number of "filler" response 
messages per request
+
+    public AMQ3274Test() throws Exception {
+        broker1 = new EmbeddedTcpBroker();
+        broker2 = new EmbeddedTcpBroker();
+
+        broker1.coreConnectTo(broker2, true);
+        broker2.coreConnectTo(broker1, true);
+    }
+
+    public void logMessage(String msg) {
+        System.out.println(msg);
+        System.out.flush();
+    }
+
+    public void testMessages(Session sess, MessageProducer req_prod, 
Destination resp_dest, int num_msg) throws Exception {
+        MessageConsumer resp_cons;
+        TextMessage msg;
+        MessageClient cons_client;
+        int cur;
+        int tot_expected;
+
+        resp_cons = sess.createConsumer(resp_dest);
+
+        cons_client = new MessageClient(resp_cons, num_msg);
+        cons_client.start();
+
+        cur = 0;
+        while ((cur < num_msg) && (!testError)) {
+            msg = sess.createTextMessage("MSG AAAA " + cur);
+            msg.setIntProperty("SEQ", 100 + cur);
+            msg.setStringProperty("TEST", "TOPO");
+            msg.setJMSReplyTo(resp_dest);
+
+            if (cur == (num_msg - 1))
+                msg.setBooleanProperty("end-of-response", true);
+
+            req_prod.send(msg);
+
+            cur++;
+        }
+
+        cons_client.waitShutdown(5000);
+
+        if (cons_client.shutdown()) {
+            LOG.debug("Consumer client shutdown complete");
+        } else {
+            LOG.debug("Consumer client shutdown incomplete!!!");
+        }
+
+        tot_expected = num_msg * (echoResponseFill + 1);
+
+        if (cons_client.getNumMsgReceived() == tot_expected) {
+            LOG.info("Have " + tot_expected + " messages, as-expected");
+        } else {
+            testError = true;
+            LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; 
expected " + tot_expected);
+        }
+
+        resp_cons.close();
+    }
+
+    /**
+     * Test one destination between the given "producer broker" and
+     * "consumer broker" specified.
+     */
+    public void testOneDest(Connection conn, Session sess, Destination 
cons_dest, String prod_broker_url, String cons_broker_url, int num_msg) throws 
Exception {
+        int echo_id;
+
+        EchoService echo_svc;
+        String echo_queue_name;
+        Destination prod_dest;
+        MessageProducer msg_prod;
+
+        synchronized (this) {
+            echo_id = this.nextEchoId;
+            this.nextEchoId++;
+        }
+
+        echo_queue_name = "echo.queue." + echo_id;
+
+        LOG.trace("destroying the echo queue in case an old one exists");
+        removeQueue(conn, echo_queue_name);
+
+        echo_svc = new EchoService(echo_queue_name, prod_broker_url);
+        echo_svc.start();
+
+        LOG.trace("Creating echo queue and producer");
+        prod_dest = sess.createQueue(echo_queue_name);
+        msg_prod = sess.createProducer(prod_dest);
+
+        testMessages(sess, msg_prod, cons_dest, num_msg);
+
+        echo_svc.shutdown();
+        msg_prod.close();
+    }
+
+    /**
+     * TEST TEMPORARY TOPICS
+     */
+    public void testTempTopic(String prod_broker_url, String cons_broker_url) 
throws Exception {
+        Connection conn;
+        Session sess;
+        Destination cons_dest;
+        int num_msg;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryTopic();
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, 
num_msg);
+
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST TOPICS
+     */
+    public void testTopic(String prod_broker_url, String cons_broker_url) 
throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String topic_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        topic_name = "topotest2.perm.topic";
+        LOG.trace("Removing existing Topic");
+        removeTopic(conn, topic_name);
+        LOG.trace("Creating Topic, " + topic_name);
+        cons_dest = sess.createTopic(topic_name);
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, 
num_msg);
+
+        removeTopic(conn, topic_name);
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST TEMPORARY QUEUES
+     */
+    public void testTempQueue(String prod_broker_url, String cons_broker_url) 
throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryQueue();
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, 
num_msg);
+
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST QUEUES
+     */
+    public void testQueue(String prod_broker_url, String cons_broker_url) 
throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String queue_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + 
cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        queue_name = "topotest2.perm.queue";
+        LOG.trace("Removing existing Queue");
+        removeQueue(conn, queue_name);
+        LOG.trace("Creating Queue, " + queue_name);
+        cons_dest = sess.createQueue(queue_name);
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, 
num_msg);
+
+        removeQueue(conn, queue_name);
+        sess.close();
+        conn.close();
+    }
+
+    @Test
+    public void run() throws Exception {
+        Thread start1;
+        Thread start2;
+
+        testError = false;
+
+        // Use threads to avoid startup deadlock since the first broker 
started waits until
+        // it knows the name of the remote broker before finishing its 
startup, which means
+        // the remote must already be running.
+
+        start1 = new Thread() {
+            public void run() {
+                try {
+                    broker1.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start2 = new Thread() {
+            public void run() {
+                try {
+                    broker2.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start1.start();
+        start2.start();
+
+        start1.join();
+        start2.join();
+
+        if (!testError) {
+            this.testTempTopic(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testTempQueue(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testTopic(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testQueue(broker1.getConnectionUrl(), 
broker2.getConnectionUrl());
+        }
+        Thread.sleep(100);
+
+        shutdown();
+
+        assertTrue(!testError);
+    }
+
+    public void shutdown() throws Exception {
+        broker1.stop();
+        broker2.stop();
+    }
+
+    /**
+     * @param args
+     *            the command line arguments
+     */
+    public static void main(String[] args) {
+        AMQ3274Test main_obj;
+
+        try {
+            main_obj = new AMQ3274Test();
+            main_obj.run();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            LOG.error(null, ex);
+            System.exit(0);
+        }
+    }
+
+    protected Connection createConnection(String url) throws Exception {
+        return org.apache.activemq.ActiveMQConnection.makeConnection(url);
+    }
+
+    protected static void removeQueue(Connection conn, String dest_name) 
throws java.lang.Exception {
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = 
org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+                    (byte) 
org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) 
conn).destroyDestination(dest);
+        }
+    }
+
+    protected static void removeTopic(Connection conn, String dest_name) 
throws java.lang.Exception {
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = 
org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+                    (byte) 
org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) 
conn).destroyDestination(dest);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static String fmtMsgInfo(Message msg) throws Exception {
+        StringBuilder msg_desc;
+        String prop;
+        Enumeration prop_enum;
+
+        msg_desc = new StringBuilder();
+        msg_desc = new StringBuilder();
+
+        if (msg instanceof TextMessage) {
+            msg_desc.append(((TextMessage) msg).getText());
+        } else {
+            msg_desc.append("[");
+            msg_desc.append(msg.getClass().getName());
+            msg_desc.append("]");
+        }
+
+        prop_enum = msg.getPropertyNames();
+        while (prop_enum.hasMoreElements()) {
+            prop = (String) prop_enum.nextElement();
+            msg_desc.append("; ");
+            msg_desc.append(prop);
+            msg_desc.append("=");
+            msg_desc.append(msg.getStringProperty(prop));
+        }
+
+        return msg_desc.toString();
+    }
+
+    // 
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////// INTERNAL CLASSES
+    // /////////////////////////////////////////////////
+    // 
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    protected class EmbeddedTcpBroker {
+        protected BrokerService brokerSvc;
+        protected int brokerNum;
+        protected String brokerName;
+        protected String brokerId;
+        protected int port;
+        protected String tcpUrl;
+
+        public EmbeddedTcpBroker() throws Exception {
+            brokerSvc = new BrokerService();
+
+            synchronized (this.getClass()) {
+                brokerNum = Next_broker_num;
+                Next_broker_num++;
+            }
+
+            brokerName = "broker" + brokerNum;
+            brokerId = "b" + brokerNum;
+
+            brokerSvc.setBrokerName(brokerName);
+            brokerSvc.setBrokerId(brokerId);
+            brokerSvc.setPersistent(false);
+            brokerSvc.setUseJmx(false);
+            tcpUrl = 
brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString();
+        }
+
+        public Connection createConnection() throws URISyntaxException, 
JMSException {
+            Connection result;
+
+            result = 
org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
+
+            return result;
+        }
+
+        public String getConnectionUrl() {
+            return this.tcpUrl;
+        }
+
+        /**
+         * Create network connections to the given broker using the
+         * network-connector configuration of CORE brokers (e.g.
+         * core1.bus.dev1.coresys.tmcs)
+         *
+         * @param other
+         * @param duplex_f
+         */
+        public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) 
throws Exception {
+            this.makeConnectionTo(other, duplex_f, true);
+            this.makeConnectionTo(other, duplex_f, false);
+        }
+
+        public void start() throws Exception {
+            brokerSvc.start();
+        }
+
+        public void stop() throws Exception {
+            brokerSvc.stop();
+        }
+
+        /**
+         * Make one connection to the other embedded broker, of the specified
+         * type (queue or topic) using the standard CORE broker networking.
+         *
+         * @param other
+         * @param duplex_f
+         * @param queue_f
+         * @throws Exception
+         */
+        protected void makeConnectionTo(EmbeddedTcpBroker other, boolean 
duplex_f, boolean queue_f) throws Exception {
+            NetworkConnector nw_conn;
+            String prefix;
+            ActiveMQDestination excl_dest;
+            ArrayList<ActiveMQDestination> excludes;
+
+            nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + 
other.tcpUrl + ")"));
+            nw_conn.setDuplex(duplex_f);
+
+            if (queue_f)
+                nw_conn.setConduitSubscriptions(false);
+            else
+                nw_conn.setConduitSubscriptions(true);
+
+            nw_conn.setNetworkTTL(5);
+            nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+            nw_conn.setDecreaseNetworkConsumerPriority(true);
+            nw_conn.setBridgeTempDestinations(true);
+
+            if (queue_f) {
+                prefix = "queue";
+                excl_dest = ActiveMQDestination.createDestination(">", 
ActiveMQDestination.QUEUE_TYPE);
+            } else {
+                prefix = "topic";
+                excl_dest = ActiveMQDestination.createDestination(">", 
ActiveMQDestination.TOPIC_TYPE);
+            }
+
+            excludes = new ArrayList<ActiveMQDestination>();
+            excludes.add(excl_dest);
+            nw_conn.setExcludedDestinations(excludes);
+
+            if (duplex_f)
+                nw_conn.setName(this.brokerId + "<-" + prefix + "->" + 
other.brokerId);
+            else
+                nw_conn.setName(this.brokerId + "-" + prefix + "->" + 
other.brokerId);
+
+            brokerSvc.addNetworkConnector(nw_conn);
+        }
+    }
+
+    protected class MessageClient extends java.lang.Thread {
+        protected MessageConsumer msgCons;
+        protected boolean shutdownInd;
+        protected int expectedCount;
+        protected int lastSeq = 0;
+        protected int msgCount = 0;
+        protected boolean haveFirstSeq;
+        protected CountDownLatch shutdownLatch;
+
+        public MessageClient(MessageConsumer cons, int num_to_expect) {
+            msgCons = cons;
+            expectedCount = (num_to_expect * (echoResponseFill + 1));
+            shutdownLatch = new CountDownLatch(1);
+        }
+
+        public void run() {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                shutdownInd = false;
+                processMessages();
+
+                latch.countDown();
+            } catch (Exception exc) {
+                LOG.error("message client error", exc);
+            }
+        }
+
+        public void waitShutdown(long timeout) {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                if (latch != null)
+                    latch.await(timeout, TimeUnit.MILLISECONDS);
+                else
+                    LOG.info("echo client shutdown: client does not appear to 
be active");
+            } catch (InterruptedException int_exc) {
+                LOG.warn("wait for message client shutdown interrupted", 
int_exc);
+            }
+        }
+
+        public boolean shutdown() {
+            boolean down_ind;
+
+            if (!shutdownInd) {
+                shutdownInd = true;
+            }
+
+            waitShutdown(200);
+
+            synchronized (this) {
+                if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
+                    down_ind = true;
+                else
+                    down_ind = false;
+            }
+
+            return down_ind;
+        }
+
+        public int getNumMsgReceived() {
+            return msgCount;
+        }
+
+        protected void processMessages() throws Exception {
+            Message in_msg;
+
+            haveFirstSeq = false;
+            while ((!shutdownInd) && (!testError)) {
+                in_msg = msgCons.receive(100);
+
+                if (in_msg != null) {
+                    msgCount++;
+                    checkMessage(in_msg);
+                }
+            }
+        }
+
+        protected void checkMessage(Message in_msg) throws Exception {
+            int seq;
+
+            LOG.debug("received message " + fmtMsgInfo(in_msg));
+
+            if (in_msg.propertyExists("SEQ")) {
+                seq = in_msg.getIntProperty("SEQ");
+
+                if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
+                    LOG.error("***ERROR*** incorrect sequence number; expected 
" + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq));
+
+                    testError = true;
+                }
+
+                lastSeq = seq;
+
+                if (msgCount > expectedCount) {
+                    LOG.warn("*** have more messages than expected; have " + 
msgCount + "; expect " + expectedCount);
+
+                    testError = true;
+                }
+            }
+
+            if (in_msg.propertyExists("end-of-response")) {
+                LOG.trace("received end-of-response message");
+                shutdownInd = true;
+            }
+        }
+    }
+
+    protected class EchoService extends java.lang.Thread {
+        protected String destName;
+        protected Connection jmsConn;
+        protected Session sess;
+        protected MessageConsumer msg_cons;
+        protected boolean Shutdown_ind;
+
+        protected Destination req_dest;
+        protected Destination resp_dest;
+        protected MessageProducer msg_prod;
+
+        protected CountDownLatch waitShutdown;
+
+        public EchoService(String dest, Connection broker_conn) throws 
Exception {
+            destName = dest;
+            jmsConn = broker_conn;
+
+            Shutdown_ind = false;
+
+            sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            req_dest = sess.createQueue(destName);
+            msg_cons = sess.createConsumer(req_dest);
+
+            jmsConn.start();
+
+            waitShutdown = new CountDownLatch(1);
+        }
+
+        public EchoService(String dest, String broker_url) throws Exception {
+            this(dest, ActiveMQConnection.makeConnection(broker_url));
+        }
+
+        public void run() {
+            Message req;
+
+            try {
+                LOG.info("STARTING ECHO SERVICE");
+
+                while (!Shutdown_ind) {
+                    req = msg_cons.receive(100);
+                    if (req != null) {
+                        if (LOG.isDebugEnabled())
+                            LOG.debug("ECHO request message " + 
req.toString());
+
+                        resp_dest = req.getJMSReplyTo();
+                        if (resp_dest != null) {
+                            msg_prod = sess.createProducer(resp_dest);
+                            msg_prod.send(req);
+                            msg_prod.close();
+                            msg_prod = null;
+                        } else {
+                            LOG.warn("invalid request: no reply-to destination 
given");
+                        }
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error(null, ex);
+            } finally {
+                LOG.info("shutting down test echo service");
+
+                try {
+                    jmsConn.stop();
+                } catch (javax.jms.JMSException jms_exc) {
+                    LOG.warn("error on shutting down JMS connection", jms_exc);
+                }
+
+                synchronized (this) {
+                    waitShutdown.countDown();
+                }
+            }
+        }
+
+        /**
+         * Shut down the service, waiting up to 3 seconds for the service to
+         * terminate.
+         */
+        public void shutdown() {
+            CountDownLatch wait_l;
+
+            synchronized (this) {
+                wait_l = waitShutdown;
+            }
+
+            Shutdown_ind = true;
+
+            try {
+                if (wait_l != null) {
+                    if (wait_l.await(3000, TimeUnit.MILLISECONDS)) {
+                        LOG.info("echo service shutdown complete");
+                    } else {
+                        LOG.warn("timeout waiting for echo service shutdown");
+                    }
+                } else {
+                    LOG.info("echo service shutdown: service does not appear 
to be active");
+                }
+            } catch (InterruptedException int_exc) {
+                LOG.warn("interrupted while waiting for echo service 
shutdown");
+            }
+        }
+    }
 }


Reply via email to