this breaks the build, getMBeanServer() is intentionally protected in ManagementContext.
> activemq_t/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java:[364,33] getMBeanServer() has protected access in org.apache.activemq.broker.jmx.ManagementContext 2009/11/23 <[email protected]> > Author: bsnyder > Date: Mon Nov 23 17:02:39 2009 > New Revision: 883411 > > URL: http://svn.apache.org/viewvc?rev=883411&view=rev > Log: > Updated test for AMQ-2324 and AMQ-2484 > > Modified: > > > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java > > Modified: > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=883411&r1=883410&r2=883411&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java > (original) > +++ > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java > Mon Nov 23 17:02:39 2009 > @@ -2,35 +2,102 @@ > > import java.io.File; > import java.io.IOException; > +import java.net.URI; > +import java.net.URISyntaxException; > +import java.util.ArrayList; > +import java.util.Enumeration; > +import java.util.HashMap; > +import java.util.Hashtable; > +import java.util.List; > +import java.util.Map; > +import java.util.concurrent.TimeUnit; > > +import javax.jms.Connection; > import javax.jms.DeliveryMode; > - > -import junit.framework.Test; > - > +import javax.jms.MessageNotWriteableException; > +import javax.jms.Queue; > +import javax.jms.QueueBrowser; > +import javax.jms.Session; > +import javax.management.MBeanServer; > +import javax.management.ObjectName; > + > +import junit.framework.TestCase; > + > +import org.apache.activemq.ActiveMQConnectionFactory; > +import org.apache.activemq.broker.BrokerService; > +import org.apache.activemq.broker.BrokerTestSupport; > import org.apache.activemq.broker.StubConnection; > +import org.apache.activemq.broker.TransportConnector; > +import org.apache.activemq.broker.jmx.ManagementContext; > import org.apache.activemq.command.ActiveMQDestination; > -import org.apache.activemq.command.Command; > +import org.apache.activemq.command.ActiveMQTextMessage; > +import org.apache.activemq.command.ConnectionId; > import org.apache.activemq.command.ConnectionInfo; > import org.apache.activemq.command.ConsumerInfo; > +import org.apache.activemq.command.DestinationInfo; > import org.apache.activemq.command.Message; > import org.apache.activemq.command.MessageAck; > +import org.apache.activemq.command.MessageDispatch; > +import org.apache.activemq.command.MessageId; > import org.apache.activemq.command.ProducerInfo; > import org.apache.activemq.command.SessionInfo; > import org.apache.activemq.transport.Transport; > -import org.apache.activemq.transport.TransportListener; > +import org.apache.activemq.transport.TransportFactory; > import org.apache.activemq.util.Wait; > import org.apache.commons.io.FileUtils; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > > -public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport > { > - > +/** > + * This class duplicates most of the functionality in {...@link > NetworkTestSupport} > + * and {...@link BrokerTestSupport} because more control was needed over how > brokers > + * and connectors are created. Also, this test asserts message counts via > JMX on > + * each broker. > + * > + * @author bsnyder > + * > + */ > +public class BrokerNetworkWithStuckMessagesTest extends TestCase > /*NetworkTestSupport*/ { > + > private static final Log LOG = > LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class); > - > - private DemandForwardingBridge bridge; > - > - protected void setUp() throws Exception { > - super.setUp(); > + > + private BrokerService localBroker; > + private BrokerService remoteBroker; > + private DemandForwardingBridge bridge; > + > + protected Map<String, BrokerService> brokers = new HashMap<String, > BrokerService>(); > + protected ArrayList connections = new ArrayList(); > + > + protected TransportConnector connector; > + protected TransportConnector remoteConnector; > + > + protected long idGenerator; > + protected int msgIdGenerator; > + protected int tempDestGenerator; > + protected int maxWait = 4000; > + protected String queueName = "TEST"; > + > + protected String amqDomain = "org.apache.activemq"; > + > + protected void setUp() throws Exception { > + > + // For those who want visual confirmation: > + // Uncomment the following to enable JMX support on a port > number to use > + // Jconsole to view each broker. You will need to add some calls > to > + // Thread.sleep() to be able to actually slow things down so > that you > + // can manually see JMX attrs. > +// System.setProperty("com.sun.management.jmxremote", ""); > +// System.setProperty("com.sun.management.jmxremote.port", "1099"); > +// System.setProperty("com.sun.management.jmxremote.authenticate", > "false"); > +// System.setProperty("com.sun.management.jmxremote.ssl", "false"); > + > + // Create the local broker > + createBroker(); > + // Create the remote broker > + createRemoteBroker(); > + > + // Remove the activemq-data directory from the creation of the > remote broker > + FileUtils.deleteDirectory(new File("activemq-data")); > > // Create a network bridge between the local and remote brokers so > that > // demand-based forwarding can take place > @@ -39,79 +106,42 @@ > config.setDispatchAsync(false); > > Transport localTransport = createTransport(); > - localTransport.setTransportListener(new TransportListener() { > - Command command = null; > - public void onCommand(Object o) { > - this.command = (Command) o; > - LOG.info("Command from [" + > command.getFrom() + "] to [" + command.getTo() + "]"); > - } > - > - public void onException(IOException error) { > - LOG.info("Command from [" + > command.getFrom() + "] to [" + command.getTo() + "]"); > - LOG.info("Exception: " + error); > - } > - > - public void transportInterupted() { > - LOG.info("Interruption on local > transport"); > - } > - > - public void transportResumed() { > - LOG.info("Resumption on local transport"); > - } > - }); > - > Transport remoteTransport = createRemoteTransport(); > - remoteTransport.setTransportListener(new TransportListener() { > - Command command = null; > - public void onCommand(Object o) { > - this.command = (Command) o; > - LOG.info("Command from [" + > command.getFrom() + "] to [" + command.getTo() + "]"); > - } > - > - public void onException(IOException error) { > - LOG.info("Command from [" + > command.getFrom() + "] to [" + command.getTo() + "]"); > - LOG.info("Exception: " + error); > - } > - > - public void transportInterupted() { > - LOG.info("Interruption on remote > transport"); > - } > - > - public void transportResumed() { > - LOG.info("Resumption on remote transport"); > - } > - }); > > + // Create a network bridge between the two brokers > bridge = new DemandForwardingBridge(config, localTransport, > remoteTransport); > - bridge.setBrokerService(broker); > + bridge.setBrokerService(localBroker); > bridge.start(); > > - // Enable JMX support on the local and remote brokers > -// broker.setUseJmx(true); > -// remoteBroker.setUseJmx(true); > - > - // Make sure persistence is disabled > - broker.setPersistent(false); > - broker.setPersistenceAdapter(null); > - remoteBroker.setPersistent(false); > - remoteBroker.setPersistenceAdapter(null); > + waitForBridgeFormation(); > > - // Remove the activemq-data directory from the creation of the > remote broker > - FileUtils.deleteDirectory(new File("activemq-data")); > } > - > - protected void tearDown() throws Exception { > + > + protected void waitForBridgeFormation() throws Exception { > + for (final BrokerService broker : brokers.values()) { > + if (!broker.getNetworkConnectors().isEmpty()) { > + // Max wait here is 30 secs > + Wait.waitFor(new Wait.Condition() { > + public boolean isSatisified() throws Exception { > + return > !broker.getNetworkConnectors().get(0).activeBridges().isEmpty(); > + }}); > + } > + } > + } > + > + protected void tearDown() throws Exception { > bridge.stop(); > - super.tearDown(); > + localBroker.stop(); > + remoteBroker.stop(); > } > > - public void testBrokerNetworkWithStuckMessages() throws Exception { > - > - int sendNumMessages = 10; > - int receiveNumMessages = 5; > - > - // Create a producer and send a batch of 10 messages to the > local broker > - StubConnection connection1 = createConnection(); > + public void testBrokerNetworkWithStuckMessages() throws Exception { > + > + int sendNumMessages = 10; > + int receiveNumMessages = 5; > + > + // Create a producer > + StubConnection connection1 = createConnection(); > ConnectionInfo connectionInfo1 = createConnectionInfo(); > SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); > ProducerInfo producerInfo = createProducerInfo(sessionInfo1); > @@ -122,25 +152,25 @@ > // Create a destination on the local broker > ActiveMQDestination destinationInfo1 = null; > > + // Send a 10 messages to the local broker > for (int i = 0; i < sendNumMessages; ++i) { > - destinationInfo1 = createDestinationInfo(connection1, > connectionInfo1, ActiveMQDestination.QUEUE_TYPE); > -// connection1.send(createMessage(producerInfo, > destinationInfo1, DeliveryMode.NON_PERSISTENT)); > - connection1.request(createMessage(producerInfo, > destinationInfo1, DeliveryMode.NON_PERSISTENT)); > + destinationInfo1 = createDestinationInfo(connection1, > connectionInfo1, ActiveMQDestination.QUEUE_TYPE); > + connection1.request(createMessage(producerInfo, > destinationInfo1, DeliveryMode.NON_PERSISTENT)); > } > > // Ensure that there are 10 messages on the local broker > - int messageCount1 = countMessagesInQueue(connection1, > connectionInfo1, destinationInfo1); > - assertEquals(10, messageCount1); > + Object[] messages = browseQueueWithJmx(localBroker); > + assertEquals(sendNumMessages, messages.length); > > > - // Create a consumer on the remote broker > + // Create a synchronous consumer on the remote broker > final StubConnection connection2 = createRemoteConnection(); > ConnectionInfo connectionInfo2 = createConnectionInfo(); > SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); > connection2.send(connectionInfo2); > connection2.send(sessionInfo2); > ActiveMQDestination destinationInfo2 = > - createDestinationInfo(connection2, connectionInfo2, > ActiveMQDestination.QUEUE_TYPE); > + createDestinationInfo(connection2, connectionInfo2, > ActiveMQDestination.QUEUE_TYPE); > final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, > destinationInfo2); > connection2.send(consumerInfo2); > > @@ -149,32 +179,27 @@ > // method, this will cause the messages on the local broker to be > // forwarded to the remote broker. > for (int i = 0; i < receiveNumMessages; ++i) { > - assertTrue("Message " + i + " was not received", > Wait.waitFor(new Wait.Condition() { > - public boolean isSatisified() throws Exception { > - Message message1 = > receiveMessage(connection2); > - assertNotNull(message1); > - connection2.send(createAck(consumerInfo2, > message1, 1, MessageAck.STANDARD_ACK_TYPE)); > - return message1 != null; > - } > - })); > -// Message message1 = receiveMessage(connection2); > -// assertNotNull(message1); > -// connection2.send(createAck(consumerInfo2, message1, 1, > MessageAck.STANDARD_ACK_TYPE)); > + Message message1 = receiveMessage(connection2); > + assertNotNull(message1); > + connection2.send(createAck(consumerInfo2, message1, 1, > MessageAck.STANDARD_ACK_TYPE)); > + > + Object[] msgs1 = browseQueueWithJmx(remoteBroker); > + LOG.info("Found [" + msgs1.length + "] messages with JMX"); > +// assertEquals((sendNumMessages-i), msgs.length); > } > > - // Close the consumer on the remote broker > - connection2.send(consumerInfo2.createRemoveCommand()); > - > // Ensure that there are zero messages on the local broker. This > tells > // us that those messages have been prefetched to the remote broker > // where the demand exists. > - int messageCount2 = countMessagesInQueue(connection1, > connectionInfo1, destinationInfo1); > -// Sometimes it fails here > - assertEquals(0, messageCount2); > + messages = browseQueueWithJmx(localBroker); > + assertEquals(0, messages.length); > + > + // Close the consumer on the remote broker > + connection2.send(consumerInfo2.createRemoveCommand()); > > // There should now be 5 messages stuck on the remote broker > - int messageCount3 = countMessagesInQueue(connection2, > connectionInfo2, destinationInfo2); > - assertEquals(5, messageCount3); > + messages = browseQueueWithJmx(remoteBroker); > + assertEquals(5, messages.length); > > // Create a consumer on the local broker just to confirm that it > doesn't > // receive any messages > @@ -182,13 +207,13 @@ > connection1.send(consumerInfo1); > Message message1 = receiveMessage(connection1); > > - ////////////////////////////////////////////////////// > + ////////////////////////////////////////////////////// > // An assertNull() is done here because this is currently the > correct > // behavior. This is actually the purpose of this test - to prove > that > // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 > aim > // to fix this situation so that messages don't get stuck. > assertNull(message1); > - ////////////////////////////////////////////////////// > + ////////////////////////////////////////////////////// > > ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, > destinationInfo2); > connection2.send(consumerInfo3); > @@ -197,30 +222,247 @@ > // to clean up the queue. > int counter = 0; > for (int i = 0; i < receiveNumMessages; ++i) { > - message1 = receiveMessage(connection2); > - assertNotNull(message1); > + message1 = receiveMessage(connection2); > + assertNotNull(message1); > connection2.send(createAck(consumerInfo3, message1, 1, > MessageAck.STANDARD_ACK_TYPE)); > ++counter; > } > // Ensure that 5 messages were received > assertEquals(receiveNumMessages, counter); > > - Thread.sleep(2000); > + // Let those acks percolate... This stinks but it's the only way > currently > + // because these types of internal broker actions are > non-deterministic. > + Thread.sleep(4000); > > // Ensure that the queue on the remote broker is empty > - int messageCount4 = countMessagesInQueue(connection2, > connectionInfo2, destinationInfo1); > -// Sometimes it fails here > - assertEquals(0, messageCount4); > + messages = browseQueueWithJmx(remoteBroker); > + assertEquals(0, messages.length); > > // Close the consumer on the remote broker > connection2.send(consumerInfo3.createRemoveCommand()); > > connection1.stop(); > connection2.stop(); > + } > + > + protected BrokerService createBroker() throws Exception { > + localBroker = new BrokerService(); > + localBroker.setBrokerName("localhost"); > + localBroker.setUseJmx(true); > + localBroker.setPersistenceAdapter(null); > + localBroker.setPersistent(false); > + connector = createConnector(); > + localBroker.addConnector(connector); > + localBroker.start(); > + localBroker.waitUntilStarted(); > + > + localBroker.getManagementContext().setConnectorPort(2221); > + > + brokers.put(localBroker.getBrokerName(), localBroker); > + > + return localBroker; > + } > + > + protected BrokerService createRemoteBroker() throws Exception { > + remoteBroker = new BrokerService(); > + remoteBroker.setBrokerName("remotehost"); > + remoteBroker.setUseJmx(true); > + remoteBroker.setPersistenceAdapter(null); > + remoteBroker.setPersistent(false); > + remoteConnector = createRemoteConnector(); > + remoteBroker.addConnector(remoteConnector); > + remoteBroker.waitUntilStarted(); > + > + remoteBroker.getManagementContext().setConnectorPort(2222); > + > + brokers.put(remoteBroker.getBrokerName(), remoteBroker); > + > + return remoteBroker; > + } > + > + protected Transport createTransport() throws Exception { > + Transport transport = > TransportFactory.connect(connector.getServer().getConnectURI()); > + return transport; > + } > + > + protected Transport createRemoteTransport() throws Exception { > + Transport transport = > TransportFactory.connect(remoteConnector.getServer().getConnectURI()); > + return transport; > + } > + > + protected TransportConnector createConnector() throws Exception, > IOException, URISyntaxException { > + return new TransportConnector(TransportFactory.bind(new > URI(getLocalURI()))); > + } > + > + protected TransportConnector createRemoteConnector() throws Exception, > IOException, URISyntaxException { > + return new TransportConnector(TransportFactory.bind(new > URI(getRemoteURI()))); > + } > + > + protected String getRemoteURI() { > + return "vm://remotehost"; > + } > + > + protected String getLocalURI() { > + return "vm://localhost"; > + } > + > + protected StubConnection createConnection() throws Exception { > + Transport transport = > TransportFactory.connect(connector.getServer().getConnectURI()); > + StubConnection connection = new StubConnection(transport); > + connections.add(connection); > + return connection; > + } > + > + protected StubConnection createRemoteConnection() throws Exception { > + Transport transport = > TransportFactory.connect(remoteConnector.getServer().getConnectURI()); > + StubConnection connection = new StubConnection(transport); > + connections.add(connection); > + return connection; > + } > + > + @SuppressWarnings("unchecked") > + private Object[] browseQueueWithJms(BrokerService broker) throws > Exception { > + Object[] messages = null; > + Connection connection = null; > + Session session = null; > + > + try { > + URI brokerUri = connector.getUri(); > + ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(brokerUri.toString()); > + connection = connectionFactory.createConnection(); > + connection.start(); > + session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > + Queue destination = session.createQueue(queueName); > + QueueBrowser browser = > session.createBrowser(destination); > + List<Message> list = new ArrayList<Message>(); > + for (Enumeration<Message> enumn = > browser.getEnumeration(); enumn.hasMoreElements();) { > + list.add(enumn.nextElement()); > + } > + messages = list.toArray(); > + } > + finally { > + if (session != null) { > + session.close(); > + } > + if (connection != null) { > + connection.close(); > + } > + } > + LOG.info("+Browsed with JMS: " + messages.length); > + > + return messages; > } > - > - public static Test suite() { > - return suite(BrokerNetworkWithStuckMessagesTest.class); > + > + private Object[] browseQueueWithJmx(BrokerService broker) throws > Exception { > + Hashtable<String, String> params = new Hashtable<String, > String>(); > + params.put("BrokerName", broker.getBrokerName()); > + params.put("Type", "Queue"); > + params.put("Destination", queueName); > + ObjectName queueObjectName = ObjectName.getInstance(amqDomain, > params); > + > + ManagementContext mgmtCtx = broker.getManagementContext(); > + MBeanServer mbs = mgmtCtx.getMBeanServer(); > + Object[] messages = (Object[]) mbs.invoke(queueObjectName, > "browse", new Object[0], new String[0]); > + > + LOG.info("+Browsed with JMX: " + messages.length); > + > + return messages; > + } > + > + protected ConnectionInfo createConnectionInfo() throws Exception { > + ConnectionInfo info = new ConnectionInfo(); > + info.setConnectionId(new ConnectionId("connection:" + > (++idGenerator))); > + info.setClientId(info.getConnectionId().getValue()); > + return info; > + } > + > + protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) > throws Exception { > + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); > + return info; > + } > + > + protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) > throws Exception { > + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); > + return info; > + } > + > + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, > ActiveMQDestination destination) throws Exception { > + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); > + info.setBrowser(false); > + info.setDestination(destination); > + info.setPrefetchSize(1000); > + info.setDispatchAsync(false); > + return info; > + } > + > + protected DestinationInfo createTempDestinationInfo(ConnectionInfo > connectionInfo, byte destinationType) { > + DestinationInfo info = new DestinationInfo(); > + info.setConnectionId(connectionInfo.getConnectionId()); > + info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); > + > > info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() > + ":" + (++tempDestGenerator), destinationType)); > + return info; > + } > + > + protected ActiveMQDestination createDestinationInfo(StubConnection > connection, ConnectionInfo connectionInfo1, byte destinationType) throws > Exception { > + if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) { > + DestinationInfo info = > createTempDestinationInfo(connectionInfo1, destinationType); > + connection.send(info); > + return info.getDestination(); > + } else { > + return ActiveMQDestination.createDestination(queueName, > destinationType); > + } > + } > + > + protected Message createMessage(ProducerInfo producerInfo, > ActiveMQDestination destination, int deliveryMode) { > + Message message = createMessage(producerInfo, destination); > + message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); > + return message; > + } > + > + protected Message createMessage(ProducerInfo producerInfo, > ActiveMQDestination destination) { > + ActiveMQTextMessage message = new ActiveMQTextMessage(); > + message.setMessageId(new MessageId(producerInfo, > ++msgIdGenerator)); > + message.setDestination(destination); > + message.setPersistent(false); > + try { > + message.setText("Test Message Payload."); > + } catch (MessageNotWriteableException e) { > + } > + return message; > + } > + > + protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, > int count, byte ackType) { > + MessageAck ack = new MessageAck(); > + ack.setAckType(ackType); > + ack.setConsumerId(consumerInfo.getConsumerId()); > + ack.setDestination(msg.getDestination()); > + ack.setLastMessageId(msg.getMessageId()); > + ack.setMessageCount(count); > + return ack; > + } > + > + public Message receiveMessage(StubConnection connection) throws > InterruptedException { > + return receiveMessage(connection, maxWait); > + } > + > + public Message receiveMessage(StubConnection connection, long timeout) > throws InterruptedException { > + while (true) { > + Object o = connection.getDispatchQueue().poll(timeout, > TimeUnit.MILLISECONDS); > + > + if (o == null) { > + return null; > + } > + if (o instanceof MessageDispatch) { > + > + MessageDispatch dispatch = (MessageDispatch)o; > + if (dispatch.getMessage() == null) { > + return null; > + } > + dispatch.setMessage(dispatch.getMessage().copy()); > + > dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); > + return dispatch.getMessage(); > + } > + } > } > > } > > > -- http://blog.garytully.com Open Source Integration http://fusesource.com
