committed a fix for the protected access compile error, but the test fails for me.
2009/11/23 Gary Tully <[email protected]> > this breaks the build, getMBeanServer() is intentionally protected in > ManagementContext. > > > > activemq_t/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java:[364,33] > getMBeanServer() has protected access in > org.apache.activemq.broker.jmx.ManagementContext > > 2009/11/23 <[email protected]> > > Author: bsnyder >> Date: Mon Nov 23 17:02:39 2009 >> New Revision: 883411 >> >> URL: http://svn.apache.org/viewvc?rev=883411&view=rev >> Log: >> Updated test for AMQ-2324 and AMQ-2484 >> >> Modified: >> >> >> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java >> >> Modified: >> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java >> URL: >> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=883411&r1=883410&r2=883411&view=diff >> >> ============================================================================== >> --- >> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java >> (original) >> +++ >> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java >> Mon Nov 23 17:02:39 2009 >> @@ -2,35 +2,102 @@ >> >> import java.io.File; >> import java.io.IOException; >> +import java.net.URI; >> +import java.net.URISyntaxException; >> +import java.util.ArrayList; >> +import java.util.Enumeration; >> +import java.util.HashMap; >> +import java.util.Hashtable; >> +import java.util.List; >> +import java.util.Map; >> +import java.util.concurrent.TimeUnit; >> >> +import javax.jms.Connection; >> import javax.jms.DeliveryMode; >> - >> -import junit.framework.Test; >> - >> +import javax.jms.MessageNotWriteableException; >> +import javax.jms.Queue; >> +import javax.jms.QueueBrowser; >> +import javax.jms.Session; >> +import javax.management.MBeanServer; >> +import javax.management.ObjectName; >> + >> +import junit.framework.TestCase; >> + >> +import org.apache.activemq.ActiveMQConnectionFactory; >> +import org.apache.activemq.broker.BrokerService; >> +import org.apache.activemq.broker.BrokerTestSupport; >> import org.apache.activemq.broker.StubConnection; >> +import org.apache.activemq.broker.TransportConnector; >> +import org.apache.activemq.broker.jmx.ManagementContext; >> import org.apache.activemq.command.ActiveMQDestination; >> -import org.apache.activemq.command.Command; >> +import org.apache.activemq.command.ActiveMQTextMessage; >> +import org.apache.activemq.command.ConnectionId; >> import org.apache.activemq.command.ConnectionInfo; >> import org.apache.activemq.command.ConsumerInfo; >> +import org.apache.activemq.command.DestinationInfo; >> import org.apache.activemq.command.Message; >> import org.apache.activemq.command.MessageAck; >> +import org.apache.activemq.command.MessageDispatch; >> +import org.apache.activemq.command.MessageId; >> import org.apache.activemq.command.ProducerInfo; >> import org.apache.activemq.command.SessionInfo; >> import org.apache.activemq.transport.Transport; >> -import org.apache.activemq.transport.TransportListener; >> +import org.apache.activemq.transport.TransportFactory; >> import org.apache.activemq.util.Wait; >> import org.apache.commons.io.FileUtils; >> import org.apache.commons.logging.Log; >> import org.apache.commons.logging.LogFactory; >> >> -public class BrokerNetworkWithStuckMessagesTest extends >> NetworkTestSupport { >> - >> +/** >> + * This class duplicates most of the functionality in {...@link >> NetworkTestSupport} >> + * and {...@link BrokerTestSupport} because more control was needed over how >> brokers >> + * and connectors are created. Also, this test asserts message counts via >> JMX on >> + * each broker. >> + * >> + * @author bsnyder >> + * >> + */ >> +public class BrokerNetworkWithStuckMessagesTest extends TestCase >> /*NetworkTestSupport*/ { >> + >> private static final Log LOG = >> LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class); >> - >> - private DemandForwardingBridge bridge; >> - >> - protected void setUp() throws Exception { >> - super.setUp(); >> + >> + private BrokerService localBroker; >> + private BrokerService remoteBroker; >> + private DemandForwardingBridge bridge; >> + >> + protected Map<String, BrokerService> brokers = new HashMap<String, >> BrokerService>(); >> + protected ArrayList connections = new ArrayList(); >> + >> + protected TransportConnector connector; >> + protected TransportConnector remoteConnector; >> + >> + protected long idGenerator; >> + protected int msgIdGenerator; >> + protected int tempDestGenerator; >> + protected int maxWait = 4000; >> + protected String queueName = "TEST"; >> + >> + protected String amqDomain = "org.apache.activemq"; >> + >> + protected void setUp() throws Exception { >> + >> + // For those who want visual confirmation: >> + // Uncomment the following to enable JMX support on a port >> number to use >> + // Jconsole to view each broker. You will need to add some >> calls to >> + // Thread.sleep() to be able to actually slow things down so >> that you >> + // can manually see JMX attrs. >> +// System.setProperty("com.sun.management.jmxremote", ""); >> +// System.setProperty("com.sun.management.jmxremote.port", >> "1099"); >> +// System.setProperty("com.sun.management.jmxremote.authenticate", >> "false"); >> +// System.setProperty("com.sun.management.jmxremote.ssl", >> "false"); >> + >> + // Create the local broker >> + createBroker(); >> + // Create the remote broker >> + createRemoteBroker(); >> + >> + // Remove the activemq-data directory from the creation of the >> remote broker >> + FileUtils.deleteDirectory(new File("activemq-data")); >> >> // Create a network bridge between the local and remote brokers so >> that >> // demand-based forwarding can take place >> @@ -39,79 +106,42 @@ >> config.setDispatchAsync(false); >> >> Transport localTransport = createTransport(); >> - localTransport.setTransportListener(new TransportListener() { >> - Command command = null; >> - public void onCommand(Object o) { >> - this.command = (Command) o; >> - LOG.info("Command from [" + >> command.getFrom() + "] to [" + command.getTo() + "]"); >> - } >> - >> - public void onException(IOException error) { >> - LOG.info("Command from [" + >> command.getFrom() + "] to [" + command.getTo() + "]"); >> - LOG.info("Exception: " + error); >> - } >> - >> - public void transportInterupted() { >> - LOG.info("Interruption on local >> transport"); >> - } >> - >> - public void transportResumed() { >> - LOG.info("Resumption on local transport"); >> - } >> - }); >> - >> Transport remoteTransport = createRemoteTransport(); >> - remoteTransport.setTransportListener(new TransportListener() { >> - Command command = null; >> - public void onCommand(Object o) { >> - this.command = (Command) o; >> - LOG.info("Command from [" + >> command.getFrom() + "] to [" + command.getTo() + "]"); >> - } >> - >> - public void onException(IOException error) { >> - LOG.info("Command from [" + >> command.getFrom() + "] to [" + command.getTo() + "]"); >> - LOG.info("Exception: " + error); >> - } >> - >> - public void transportInterupted() { >> - LOG.info("Interruption on remote >> transport"); >> - } >> - >> - public void transportResumed() { >> - LOG.info("Resumption on remote >> transport"); >> - } >> - }); >> >> + // Create a network bridge between the two brokers >> bridge = new DemandForwardingBridge(config, localTransport, >> remoteTransport); >> - bridge.setBrokerService(broker); >> + bridge.setBrokerService(localBroker); >> bridge.start(); >> >> - // Enable JMX support on the local and remote brokers >> -// broker.setUseJmx(true); >> -// remoteBroker.setUseJmx(true); >> - >> - // Make sure persistence is disabled >> - broker.setPersistent(false); >> - broker.setPersistenceAdapter(null); >> - remoteBroker.setPersistent(false); >> - remoteBroker.setPersistenceAdapter(null); >> + waitForBridgeFormation(); >> >> - // Remove the activemq-data directory from the creation of the >> remote broker >> - FileUtils.deleteDirectory(new File("activemq-data")); >> } >> - >> - protected void tearDown() throws Exception { >> + >> + protected void waitForBridgeFormation() throws Exception { >> + for (final BrokerService broker : brokers.values()) { >> + if (!broker.getNetworkConnectors().isEmpty()) { >> + // Max wait here is 30 secs >> + Wait.waitFor(new Wait.Condition() { >> + public boolean isSatisified() throws Exception { >> + return >> !broker.getNetworkConnectors().get(0).activeBridges().isEmpty(); >> + }}); >> + } >> + } >> + } >> + >> + protected void tearDown() throws Exception { >> bridge.stop(); >> - super.tearDown(); >> + localBroker.stop(); >> + remoteBroker.stop(); >> } >> >> - public void testBrokerNetworkWithStuckMessages() throws Exception >> { >> - >> - int sendNumMessages = 10; >> - int receiveNumMessages = 5; >> - >> - // Create a producer and send a batch of 10 messages to >> the local broker >> - StubConnection connection1 = createConnection(); >> + public void testBrokerNetworkWithStuckMessages() throws Exception { >> + >> + int sendNumMessages = 10; >> + int receiveNumMessages = 5; >> + >> + // Create a producer >> + StubConnection connection1 = createConnection(); >> ConnectionInfo connectionInfo1 = createConnectionInfo(); >> SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); >> ProducerInfo producerInfo = createProducerInfo(sessionInfo1); >> @@ -122,25 +152,25 @@ >> // Create a destination on the local broker >> ActiveMQDestination destinationInfo1 = null; >> >> + // Send a 10 messages to the local broker >> for (int i = 0; i < sendNumMessages; ++i) { >> - destinationInfo1 = createDestinationInfo(connection1, >> connectionInfo1, ActiveMQDestination.QUEUE_TYPE); >> -// connection1.send(createMessage(producerInfo, >> destinationInfo1, DeliveryMode.NON_PERSISTENT)); >> - connection1.request(createMessage(producerInfo, >> destinationInfo1, DeliveryMode.NON_PERSISTENT)); >> + destinationInfo1 = createDestinationInfo(connection1, >> connectionInfo1, ActiveMQDestination.QUEUE_TYPE); >> + connection1.request(createMessage(producerInfo, >> destinationInfo1, DeliveryMode.NON_PERSISTENT)); >> } >> >> // Ensure that there are 10 messages on the local broker >> - int messageCount1 = countMessagesInQueue(connection1, >> connectionInfo1, destinationInfo1); >> - assertEquals(10, messageCount1); >> + Object[] messages = browseQueueWithJmx(localBroker); >> + assertEquals(sendNumMessages, messages.length); >> >> >> - // Create a consumer on the remote broker >> + // Create a synchronous consumer on the remote broker >> final StubConnection connection2 = createRemoteConnection(); >> ConnectionInfo connectionInfo2 = createConnectionInfo(); >> SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); >> connection2.send(connectionInfo2); >> connection2.send(sessionInfo2); >> ActiveMQDestination destinationInfo2 = >> - createDestinationInfo(connection2, connectionInfo2, >> ActiveMQDestination.QUEUE_TYPE); >> + createDestinationInfo(connection2, connectionInfo2, >> ActiveMQDestination.QUEUE_TYPE); >> final ConsumerInfo consumerInfo2 = >> createConsumerInfo(sessionInfo2, destinationInfo2); >> connection2.send(consumerInfo2); >> >> @@ -149,32 +179,27 @@ >> // method, this will cause the messages on the local broker to be >> // forwarded to the remote broker. >> for (int i = 0; i < receiveNumMessages; ++i) { >> - assertTrue("Message " + i + " was not received", >> Wait.waitFor(new Wait.Condition() { >> - public boolean isSatisified() throws Exception { >> - Message message1 = >> receiveMessage(connection2); >> - assertNotNull(message1); >> - connection2.send(createAck(consumerInfo2, >> message1, 1, MessageAck.STANDARD_ACK_TYPE)); >> - return message1 != null; >> - } >> - })); >> -// Message message1 = receiveMessage(connection2); >> -// assertNotNull(message1); >> -// connection2.send(createAck(consumerInfo2, message1, 1, >> MessageAck.STANDARD_ACK_TYPE)); >> + Message message1 = receiveMessage(connection2); >> + assertNotNull(message1); >> + connection2.send(createAck(consumerInfo2, message1, 1, >> MessageAck.STANDARD_ACK_TYPE)); >> + >> + Object[] msgs1 = browseQueueWithJmx(remoteBroker); >> + LOG.info("Found [" + msgs1.length + "] messages with JMX"); >> +// assertEquals((sendNumMessages-i), msgs.length); >> } >> >> - // Close the consumer on the remote broker >> - connection2.send(consumerInfo2.createRemoveCommand()); >> - >> // Ensure that there are zero messages on the local broker. This >> tells >> // us that those messages have been prefetched to the remote >> broker >> // where the demand exists. >> - int messageCount2 = countMessagesInQueue(connection1, >> connectionInfo1, destinationInfo1); >> -// Sometimes it fails here >> - assertEquals(0, messageCount2); >> + messages = browseQueueWithJmx(localBroker); >> + assertEquals(0, messages.length); >> + >> + // Close the consumer on the remote broker >> + connection2.send(consumerInfo2.createRemoveCommand()); >> >> // There should now be 5 messages stuck on the remote broker >> - int messageCount3 = countMessagesInQueue(connection2, >> connectionInfo2, destinationInfo2); >> - assertEquals(5, messageCount3); >> + messages = browseQueueWithJmx(remoteBroker); >> + assertEquals(5, messages.length); >> >> // Create a consumer on the local broker just to confirm that it >> doesn't >> // receive any messages >> @@ -182,13 +207,13 @@ >> connection1.send(consumerInfo1); >> Message message1 = receiveMessage(connection1); >> >> - ////////////////////////////////////////////////////// >> + ////////////////////////////////////////////////////// >> // An assertNull() is done here because this is currently the >> correct >> // behavior. This is actually the purpose of this test - to prove >> that >> // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 >> aim >> // to fix this situation so that messages don't get stuck. >> assertNull(message1); >> - ////////////////////////////////////////////////////// >> + ////////////////////////////////////////////////////// >> >> ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, >> destinationInfo2); >> connection2.send(consumerInfo3); >> @@ -197,30 +222,247 @@ >> // to clean up the queue. >> int counter = 0; >> for (int i = 0; i < receiveNumMessages; ++i) { >> - message1 = receiveMessage(connection2); >> - assertNotNull(message1); >> + message1 = receiveMessage(connection2); >> + assertNotNull(message1); >> connection2.send(createAck(consumerInfo3, message1, 1, >> MessageAck.STANDARD_ACK_TYPE)); >> ++counter; >> } >> // Ensure that 5 messages were received >> assertEquals(receiveNumMessages, counter); >> >> - Thread.sleep(2000); >> + // Let those acks percolate... This stinks but it's the only way >> currently >> + // because these types of internal broker actions are >> non-deterministic. >> + Thread.sleep(4000); >> >> // Ensure that the queue on the remote broker is empty >> - int messageCount4 = countMessagesInQueue(connection2, >> connectionInfo2, destinationInfo1); >> -// Sometimes it fails here >> - assertEquals(0, messageCount4); >> + messages = browseQueueWithJmx(remoteBroker); >> + assertEquals(0, messages.length); >> >> // Close the consumer on the remote broker >> connection2.send(consumerInfo3.createRemoveCommand()); >> >> connection1.stop(); >> connection2.stop(); >> + } >> + >> + protected BrokerService createBroker() throws Exception { >> + localBroker = new BrokerService(); >> + localBroker.setBrokerName("localhost"); >> + localBroker.setUseJmx(true); >> + localBroker.setPersistenceAdapter(null); >> + localBroker.setPersistent(false); >> + connector = createConnector(); >> + localBroker.addConnector(connector); >> + localBroker.start(); >> + localBroker.waitUntilStarted(); >> + >> + localBroker.getManagementContext().setConnectorPort(2221); >> + >> + brokers.put(localBroker.getBrokerName(), localBroker); >> + >> + return localBroker; >> + } >> + >> + protected BrokerService createRemoteBroker() throws Exception { >> + remoteBroker = new BrokerService(); >> + remoteBroker.setBrokerName("remotehost"); >> + remoteBroker.setUseJmx(true); >> + remoteBroker.setPersistenceAdapter(null); >> + remoteBroker.setPersistent(false); >> + remoteConnector = createRemoteConnector(); >> + remoteBroker.addConnector(remoteConnector); >> + remoteBroker.waitUntilStarted(); >> + >> + remoteBroker.getManagementContext().setConnectorPort(2222); >> + >> + brokers.put(remoteBroker.getBrokerName(), remoteBroker); >> + >> + return remoteBroker; >> + } >> + >> + protected Transport createTransport() throws Exception { >> + Transport transport = >> TransportFactory.connect(connector.getServer().getConnectURI()); >> + return transport; >> + } >> + >> + protected Transport createRemoteTransport() throws Exception { >> + Transport transport = >> TransportFactory.connect(remoteConnector.getServer().getConnectURI()); >> + return transport; >> + } >> + >> + protected TransportConnector createConnector() throws Exception, >> IOException, URISyntaxException { >> + return new TransportConnector(TransportFactory.bind(new >> URI(getLocalURI()))); >> + } >> + >> + protected TransportConnector createRemoteConnector() throws >> Exception, IOException, URISyntaxException { >> + return new TransportConnector(TransportFactory.bind(new >> URI(getRemoteURI()))); >> + } >> + >> + protected String getRemoteURI() { >> + return "vm://remotehost"; >> + } >> + >> + protected String getLocalURI() { >> + return "vm://localhost"; >> + } >> + >> + protected StubConnection createConnection() throws Exception { >> + Transport transport = >> TransportFactory.connect(connector.getServer().getConnectURI()); >> + StubConnection connection = new StubConnection(transport); >> + connections.add(connection); >> + return connection; >> + } >> + >> + protected StubConnection createRemoteConnection() throws Exception { >> + Transport transport = >> TransportFactory.connect(remoteConnector.getServer().getConnectURI()); >> + StubConnection connection = new StubConnection(transport); >> + connections.add(connection); >> + return connection; >> + } >> + >> + @SuppressWarnings("unchecked") >> + private Object[] browseQueueWithJms(BrokerService broker) throws >> Exception { >> + Object[] messages = null; >> + Connection connection = null; >> + Session session = null; >> + >> + try { >> + URI brokerUri = connector.getUri(); >> + ActiveMQConnectionFactory connectionFactory = new >> ActiveMQConnectionFactory(brokerUri.toString()); >> + connection = connectionFactory.createConnection(); >> + connection.start(); >> + session = connection.createSession(false, >> Session.AUTO_ACKNOWLEDGE); >> + Queue destination = >> session.createQueue(queueName); >> + QueueBrowser browser = >> session.createBrowser(destination); >> + List<Message> list = new ArrayList<Message>(); >> + for (Enumeration<Message> enumn = >> browser.getEnumeration(); enumn.hasMoreElements();) { >> + list.add(enumn.nextElement()); >> + } >> + messages = list.toArray(); >> + } >> + finally { >> + if (session != null) { >> + session.close(); >> + } >> + if (connection != null) { >> + connection.close(); >> + } >> + } >> + LOG.info("+Browsed with JMS: " + messages.length); >> + >> + return messages; >> } >> - >> - public static Test suite() { >> - return suite(BrokerNetworkWithStuckMessagesTest.class); >> + >> + private Object[] browseQueueWithJmx(BrokerService broker) throws >> Exception { >> + Hashtable<String, String> params = new Hashtable<String, >> String>(); >> + params.put("BrokerName", broker.getBrokerName()); >> + params.put("Type", "Queue"); >> + params.put("Destination", queueName); >> + ObjectName queueObjectName = ObjectName.getInstance(amqDomain, >> params); >> + >> + ManagementContext mgmtCtx = broker.getManagementContext(); >> + MBeanServer mbs = mgmtCtx.getMBeanServer(); >> + Object[] messages = (Object[]) mbs.invoke(queueObjectName, >> "browse", new Object[0], new String[0]); >> + >> + LOG.info("+Browsed with JMX: " + messages.length); >> + >> + return messages; >> + } >> + >> + protected ConnectionInfo createConnectionInfo() throws Exception { >> + ConnectionInfo info = new ConnectionInfo(); >> + info.setConnectionId(new ConnectionId("connection:" + >> (++idGenerator))); >> + info.setClientId(info.getConnectionId().getValue()); >> + return info; >> + } >> + >> + protected SessionInfo createSessionInfo(ConnectionInfo >> connectionInfo) throws Exception { >> + SessionInfo info = new SessionInfo(connectionInfo, >> ++idGenerator); >> + return info; >> + } >> + >> + protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) >> throws Exception { >> + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); >> + return info; >> + } >> + >> + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, >> ActiveMQDestination destination) throws Exception { >> + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); >> + info.setBrowser(false); >> + info.setDestination(destination); >> + info.setPrefetchSize(1000); >> + info.setDispatchAsync(false); >> + return info; >> + } >> + >> + protected DestinationInfo createTempDestinationInfo(ConnectionInfo >> connectionInfo, byte destinationType) { >> + DestinationInfo info = new DestinationInfo(); >> + info.setConnectionId(connectionInfo.getConnectionId()); >> + info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); >> + >> >> info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() >> + ":" + (++tempDestGenerator), destinationType)); >> + return info; >> + } >> + >> + protected ActiveMQDestination createDestinationInfo(StubConnection >> connection, ConnectionInfo connectionInfo1, byte destinationType) throws >> Exception { >> + if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) { >> + DestinationInfo info = >> createTempDestinationInfo(connectionInfo1, destinationType); >> + connection.send(info); >> + return info.getDestination(); >> + } else { >> + return ActiveMQDestination.createDestination(queueName, >> destinationType); >> + } >> + } >> + >> + protected Message createMessage(ProducerInfo producerInfo, >> ActiveMQDestination destination, int deliveryMode) { >> + Message message = createMessage(producerInfo, destination); >> + message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); >> + return message; >> + } >> + >> + protected Message createMessage(ProducerInfo producerInfo, >> ActiveMQDestination destination) { >> + ActiveMQTextMessage message = new ActiveMQTextMessage(); >> + message.setMessageId(new MessageId(producerInfo, >> ++msgIdGenerator)); >> + message.setDestination(destination); >> + message.setPersistent(false); >> + try { >> + message.setText("Test Message Payload."); >> + } catch (MessageNotWriteableException e) { >> + } >> + return message; >> + } >> + >> + protected MessageAck createAck(ConsumerInfo consumerInfo, Message >> msg, int count, byte ackType) { >> + MessageAck ack = new MessageAck(); >> + ack.setAckType(ackType); >> + ack.setConsumerId(consumerInfo.getConsumerId()); >> + ack.setDestination(msg.getDestination()); >> + ack.setLastMessageId(msg.getMessageId()); >> + ack.setMessageCount(count); >> + return ack; >> + } >> + >> + public Message receiveMessage(StubConnection connection) throws >> InterruptedException { >> + return receiveMessage(connection, maxWait); >> + } >> + >> + public Message receiveMessage(StubConnection connection, long >> timeout) throws InterruptedException { >> + while (true) { >> + Object o = connection.getDispatchQueue().poll(timeout, >> TimeUnit.MILLISECONDS); >> + >> + if (o == null) { >> + return null; >> + } >> + if (o instanceof MessageDispatch) { >> + >> + MessageDispatch dispatch = (MessageDispatch)o; >> + if (dispatch.getMessage() == null) { >> + return null; >> + } >> + dispatch.setMessage(dispatch.getMessage().copy()); >> + >> dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); >> + return dispatch.getMessage(); >> + } >> + } >> } >> >> } >> >> >> > > > -- > http://blog.garytully.com > > Open Source Integration > http://fusesource.com > -- http://blog.garytully.com Open Source Integration http://fusesource.com
