Author: bsnyder
Date: Mon Nov 23 17:02:39 2009
New Revision: 883411
URL: http://svn.apache.org/viewvc?rev=883411&view=rev
Log:
Updated test for AMQ-2324 and AMQ-2484
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=883411&r1=883410&r2=883411&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Mon Nov 23 17:02:39 2009
@@ -2,35 +2,102 @@
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
-
-import junit.framework.Test;
-
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTestSupport;
import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
-
+/**
+ * This class duplicates most of the functionality in {...@link
NetworkTestSupport}
+ * and {...@link BrokerTestSupport} because more control was needed over how
brokers
+ * and connectors are created. Also, this test asserts message counts via JMX
on
+ * each broker.
+ *
+ * @author bsnyder
+ *
+ */
+public class BrokerNetworkWithStuckMessagesTest extends TestCase
/*NetworkTestSupport*/ {
+
private static final Log LOG =
LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class);
-
- private DemandForwardingBridge bridge;
-
- protected void setUp() throws Exception {
- super.setUp();
+
+ private BrokerService localBroker;
+ private BrokerService remoteBroker;
+ private DemandForwardingBridge bridge;
+
+ protected Map<String, BrokerService> brokers = new HashMap<String,
BrokerService>();
+ protected ArrayList connections = new ArrayList();
+
+ protected TransportConnector connector;
+ protected TransportConnector remoteConnector;
+
+ protected long idGenerator;
+ protected int msgIdGenerator;
+ protected int tempDestGenerator;
+ protected int maxWait = 4000;
+ protected String queueName = "TEST";
+
+ protected String amqDomain = "org.apache.activemq";
+
+ protected void setUp() throws Exception {
+
+ // For those who want visual confirmation:
+ // Uncomment the following to enable JMX support on a port number to
use
+ // Jconsole to view each broker. You will need to add some calls to
+ // Thread.sleep() to be able to actually slow things down so that
you
+ // can manually see JMX attrs.
+// System.setProperty("com.sun.management.jmxremote", "");
+// System.setProperty("com.sun.management.jmxremote.port", "1099");
+// System.setProperty("com.sun.management.jmxremote.authenticate",
"false");
+// System.setProperty("com.sun.management.jmxremote.ssl", "false");
+
+ // Create the local broker
+ createBroker();
+ // Create the remote broker
+ createRemoteBroker();
+
+ // Remove the activemq-data directory from the creation of the remote
broker
+ FileUtils.deleteDirectory(new File("activemq-data"));
// Create a network bridge between the local and remote brokers so
that
// demand-based forwarding can take place
@@ -39,79 +106,42 @@
config.setDispatchAsync(false);
Transport localTransport = createTransport();
- localTransport.setTransportListener(new TransportListener() {
- Command command = null;
- public void onCommand(Object o) {
- this.command = (Command) o;
- LOG.info("Command from [" + command.getFrom() +
"] to [" + command.getTo() + "]");
- }
-
- public void onException(IOException error) {
- LOG.info("Command from [" + command.getFrom() +
"] to [" + command.getTo() + "]");
- LOG.info("Exception: " + error);
- }
-
- public void transportInterupted() {
- LOG.info("Interruption on local transport");
- }
-
- public void transportResumed() {
- LOG.info("Resumption on local transport");
- }
- });
-
Transport remoteTransport = createRemoteTransport();
- remoteTransport.setTransportListener(new TransportListener() {
- Command command = null;
- public void onCommand(Object o) {
- this.command = (Command) o;
- LOG.info("Command from [" + command.getFrom() +
"] to [" + command.getTo() + "]");
- }
-
- public void onException(IOException error) {
- LOG.info("Command from [" + command.getFrom() +
"] to [" + command.getTo() + "]");
- LOG.info("Exception: " + error);
- }
-
- public void transportInterupted() {
- LOG.info("Interruption on remote transport");
- }
-
- public void transportResumed() {
- LOG.info("Resumption on remote transport");
- }
- });
+ // Create a network bridge between the two brokers
bridge = new DemandForwardingBridge(config, localTransport,
remoteTransport);
- bridge.setBrokerService(broker);
+ bridge.setBrokerService(localBroker);
bridge.start();
- // Enable JMX support on the local and remote brokers
-// broker.setUseJmx(true);
-// remoteBroker.setUseJmx(true);
-
- // Make sure persistence is disabled
- broker.setPersistent(false);
- broker.setPersistenceAdapter(null);
- remoteBroker.setPersistent(false);
- remoteBroker.setPersistenceAdapter(null);
+ waitForBridgeFormation();
- // Remove the activemq-data directory from the creation of the remote
broker
- FileUtils.deleteDirectory(new File("activemq-data"));
}
-
- protected void tearDown() throws Exception {
+
+ protected void waitForBridgeFormation() throws Exception {
+ for (final BrokerService broker : brokers.values()) {
+ if (!broker.getNetworkConnectors().isEmpty()) {
+ // Max wait here is 30 secs
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return
!broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
+ }});
+ }
+ }
+ }
+
+ protected void tearDown() throws Exception {
bridge.stop();
- super.tearDown();
+ localBroker.stop();
+ remoteBroker.stop();
}
- public void testBrokerNetworkWithStuckMessages() throws Exception {
-
- int sendNumMessages = 10;
- int receiveNumMessages = 5;
-
- // Create a producer and send a batch of 10 messages to the
local broker
- StubConnection connection1 = createConnection();
+ public void testBrokerNetworkWithStuckMessages() throws Exception {
+
+ int sendNumMessages = 10;
+ int receiveNumMessages = 5;
+
+ // Create a producer
+ StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
@@ -122,25 +152,25 @@
// Create a destination on the local broker
ActiveMQDestination destinationInfo1 = null;
+ // Send a 10 messages to the local broker
for (int i = 0; i < sendNumMessages; ++i) {
- destinationInfo1 = createDestinationInfo(connection1,
connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
-// connection1.send(createMessage(producerInfo, destinationInfo1,
DeliveryMode.NON_PERSISTENT));
- connection1.request(createMessage(producerInfo,
destinationInfo1, DeliveryMode.NON_PERSISTENT));
+ destinationInfo1 = createDestinationInfo(connection1,
connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
+ connection1.request(createMessage(producerInfo, destinationInfo1,
DeliveryMode.NON_PERSISTENT));
}
// Ensure that there are 10 messages on the local broker
- int messageCount1 = countMessagesInQueue(connection1, connectionInfo1,
destinationInfo1);
- assertEquals(10, messageCount1);
+ Object[] messages = browseQueueWithJmx(localBroker);
+ assertEquals(sendNumMessages, messages.length);
- // Create a consumer on the remote broker
+ // Create a synchronous consumer on the remote broker
final StubConnection connection2 = createRemoteConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
ActiveMQDestination destinationInfo2 =
- createDestinationInfo(connection2, connectionInfo2,
ActiveMQDestination.QUEUE_TYPE);
+ createDestinationInfo(connection2, connectionInfo2,
ActiveMQDestination.QUEUE_TYPE);
final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destinationInfo2);
connection2.send(consumerInfo2);
@@ -149,32 +179,27 @@
// method, this will cause the messages on the local broker to be
// forwarded to the remote broker.
for (int i = 0; i < receiveNumMessages; ++i) {
- assertTrue("Message " + i + " was not received",
Wait.waitFor(new Wait.Condition() {
- public boolean isSatisified() throws Exception {
- Message message1 = receiveMessage(connection2);
- assertNotNull(message1);
- connection2.send(createAck(consumerInfo2, message1,
1, MessageAck.STANDARD_ACK_TYPE));
- return message1 != null;
- }
- }));
-// Message message1 = receiveMessage(connection2);
-// assertNotNull(message1);
-// connection2.send(createAck(consumerInfo2, message1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ Message message1 = receiveMessage(connection2);
+ assertNotNull(message1);
+ connection2.send(createAck(consumerInfo2, message1, 1,
MessageAck.STANDARD_ACK_TYPE));
+
+ Object[] msgs1 = browseQueueWithJmx(remoteBroker);
+ LOG.info("Found [" + msgs1.length + "] messages with JMX");
+// assertEquals((sendNumMessages-i), msgs.length);
}
- // Close the consumer on the remote broker
- connection2.send(consumerInfo2.createRemoveCommand());
-
// Ensure that there are zero messages on the local broker. This tells
// us that those messages have been prefetched to the remote broker
// where the demand exists.
- int messageCount2 = countMessagesInQueue(connection1, connectionInfo1,
destinationInfo1);
-// Sometimes it fails here
- assertEquals(0, messageCount2);
+ messages = browseQueueWithJmx(localBroker);
+ assertEquals(0, messages.length);
+
+ // Close the consumer on the remote broker
+ connection2.send(consumerInfo2.createRemoveCommand());
// There should now be 5 messages stuck on the remote broker
- int messageCount3 = countMessagesInQueue(connection2, connectionInfo2,
destinationInfo2);
- assertEquals(5, messageCount3);
+ messages = browseQueueWithJmx(remoteBroker);
+ assertEquals(5, messages.length);
// Create a consumer on the local broker just to confirm that it
doesn't
// receive any messages
@@ -182,13 +207,13 @@
connection1.send(consumerInfo1);
Message message1 = receiveMessage(connection1);
- //////////////////////////////////////////////////////
+ //////////////////////////////////////////////////////
// An assertNull() is done here because this is currently the correct
// behavior. This is actually the purpose of this test - to prove that
// messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim
// to fix this situation so that messages don't get stuck.
assertNull(message1);
- //////////////////////////////////////////////////////
+ //////////////////////////////////////////////////////
ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2,
destinationInfo2);
connection2.send(consumerInfo3);
@@ -197,30 +222,247 @@
// to clean up the queue.
int counter = 0;
for (int i = 0; i < receiveNumMessages; ++i) {
- message1 = receiveMessage(connection2);
- assertNotNull(message1);
+ message1 = receiveMessage(connection2);
+ assertNotNull(message1);
connection2.send(createAck(consumerInfo3, message1, 1,
MessageAck.STANDARD_ACK_TYPE));
++counter;
}
// Ensure that 5 messages were received
assertEquals(receiveNumMessages, counter);
- Thread.sleep(2000);
+ // Let those acks percolate... This stinks but it's the only way
currently
+ // because these types of internal broker actions are
non-deterministic.
+ Thread.sleep(4000);
// Ensure that the queue on the remote broker is empty
- int messageCount4 = countMessagesInQueue(connection2, connectionInfo2,
destinationInfo1);
-// Sometimes it fails here
- assertEquals(0, messageCount4);
+ messages = browseQueueWithJmx(remoteBroker);
+ assertEquals(0, messages.length);
// Close the consumer on the remote broker
connection2.send(consumerInfo3.createRemoveCommand());
connection1.stop();
connection2.stop();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ localBroker = new BrokerService();
+ localBroker.setBrokerName("localhost");
+ localBroker.setUseJmx(true);
+ localBroker.setPersistenceAdapter(null);
+ localBroker.setPersistent(false);
+ connector = createConnector();
+ localBroker.addConnector(connector);
+ localBroker.start();
+ localBroker.waitUntilStarted();
+
+ localBroker.getManagementContext().setConnectorPort(2221);
+
+ brokers.put(localBroker.getBrokerName(), localBroker);
+
+ return localBroker;
+ }
+
+ protected BrokerService createRemoteBroker() throws Exception {
+ remoteBroker = new BrokerService();
+ remoteBroker.setBrokerName("remotehost");
+ remoteBroker.setUseJmx(true);
+ remoteBroker.setPersistenceAdapter(null);
+ remoteBroker.setPersistent(false);
+ remoteConnector = createRemoteConnector();
+ remoteBroker.addConnector(remoteConnector);
+ remoteBroker.waitUntilStarted();
+
+ remoteBroker.getManagementContext().setConnectorPort(2222);
+
+ brokers.put(remoteBroker.getBrokerName(), remoteBroker);
+
+ return remoteBroker;
+ }
+
+ protected Transport createTransport() throws Exception {
+ Transport transport =
TransportFactory.connect(connector.getServer().getConnectURI());
+ return transport;
+ }
+
+ protected Transport createRemoteTransport() throws Exception {
+ Transport transport =
TransportFactory.connect(remoteConnector.getServer().getConnectURI());
+ return transport;
+ }
+
+ protected TransportConnector createConnector() throws Exception,
IOException, URISyntaxException {
+ return new TransportConnector(TransportFactory.bind(new
URI(getLocalURI())));
+ }
+
+ protected TransportConnector createRemoteConnector() throws Exception,
IOException, URISyntaxException {
+ return new TransportConnector(TransportFactory.bind(new
URI(getRemoteURI())));
+ }
+
+ protected String getRemoteURI() {
+ return "vm://remotehost";
+ }
+
+ protected String getLocalURI() {
+ return "vm://localhost";
+ }
+
+ protected StubConnection createConnection() throws Exception {
+ Transport transport =
TransportFactory.connect(connector.getServer().getConnectURI());
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected StubConnection createRemoteConnection() throws Exception {
+ Transport transport =
TransportFactory.connect(remoteConnector.getServer().getConnectURI());
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object[] browseQueueWithJms(BrokerService broker) throws Exception
{
+ Object[] messages = null;
+ Connection connection = null;
+ Session session = null;
+
+ try {
+ URI brokerUri = connector.getUri();
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(brokerUri.toString());
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(queueName);
+ QueueBrowser browser =
session.createBrowser(destination);
+ List<Message> list = new ArrayList<Message>();
+ for (Enumeration<Message> enumn =
browser.getEnumeration(); enumn.hasMoreElements();) {
+ list.add(enumn.nextElement());
+ }
+ messages = list.toArray();
+ }
+ finally {
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ LOG.info("+Browsed with JMS: " + messages.length);
+
+ return messages;
}
-
- public static Test suite() {
- return suite(BrokerNetworkWithStuckMessagesTest.class);
+
+ private Object[] browseQueueWithJmx(BrokerService broker) throws Exception
{
+ Hashtable<String, String> params = new Hashtable<String, String>();
+ params.put("BrokerName", broker.getBrokerName());
+ params.put("Type", "Queue");
+ params.put("Destination", queueName);
+ ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
+
+ ManagementContext mgmtCtx = broker.getManagementContext();
+ MBeanServer mbs = mgmtCtx.getMBeanServer();
+ Object[] messages = (Object[]) mbs.invoke(queueObjectName, "browse",
new Object[0], new String[0]);
+
+ LOG.info("+Browsed with JMX: " + messages.length);
+
+ return messages;
+ }
+
+ protected ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" +
(++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo)
throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws
Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ protected DestinationInfo createTempDestinationInfo(ConnectionInfo
connectionInfo, byte destinationType) {
+ DestinationInfo info = new DestinationInfo();
+ info.setConnectionId(connectionInfo.getConnectionId());
+ info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+
info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()
+ ":" + (++tempDestGenerator), destinationType));
+ return info;
+ }
+
+ protected ActiveMQDestination createDestinationInfo(StubConnection
connection, ConnectionInfo connectionInfo1, byte destinationType) throws
Exception {
+ if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
+ DestinationInfo info = createTempDestinationInfo(connectionInfo1,
destinationType);
+ connection.send(info);
+ return info.getDestination();
+ } else {
+ return ActiveMQDestination.createDestination(queueName,
destinationType);
+ }
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo,
ActiveMQDestination destination, int deliveryMode) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+ return message;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo,
ActiveMQDestination destination) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ try {
+ message.setText("Test Message Payload.");
+ } catch (MessageNotWriteableException e) {
+ }
+ return message;
+ }
+
+ protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int
count, byte ackType) {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(ackType);
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setDestination(msg.getDestination());
+ ack.setLastMessageId(msg.getMessageId());
+ ack.setMessageCount(count);
+ return ack;
+ }
+
+ public Message receiveMessage(StubConnection connection) throws
InterruptedException {
+ return receiveMessage(connection, maxWait);
+ }
+
+ public Message receiveMessage(StubConnection connection, long timeout)
throws InterruptedException {
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(timeout,
TimeUnit.MILLISECONDS);
+
+ if (o == null) {
+ return null;
+ }
+ if (o instanceof MessageDispatch) {
+
+ MessageDispatch dispatch = (MessageDispatch)o;
+ if (dispatch.getMessage() == null) {
+ return null;
+ }
+ dispatch.setMessage(dispatch.getMessage().copy());
+
dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+ return dispatch.getMessage();
+ }
+ }
}
}