Author: chirino
Date: Fri May 31 19:56:12 2013
New Revision: 1488376
URL: http://svn.apache.org/r1488376
Log:
Related to AMQ-4563: Added test cases where we select against the JMSMessageID
and fixed a bug that was causing it to fail.
Modified:
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
Modified:
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java?rev=1488376&r1=1488375&r2=1488376&view=diff
==============================================================================
---
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
(original)
+++
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
Fri May 31 19:56:12 2013
@@ -80,6 +80,52 @@ public class AMQ4563Test extends AmqpTes
}
@Test(timeout = 60000)
+ public void testSelectingOnAMQPMessageID() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ QueueImpl queue = new QueueImpl("queue://txqueue");
+ assertTrue(brokerService.isPersistent());
+
+ Connection connection = createAMQPConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("txqueue");
+ MessageProducer p = session.createProducer(destination);
+ TextMessage message = session.createTextMessage();
+ String messageText = "Hello sent at " + new
java.util.Date().toString();
+ message.setText(messageText);
+ p.send(message);
+
+ // Restart broker.
+ restartBroker(connection, session);
+ String selector = "JMSMessageID = '" + message.getJMSMessageID() + "'";
+ LOG.info("Using selector: "+selector);
+ int messagesReceived = readAllMessages(queue, selector);
+ assertEquals(1, messagesReceived);
+ }
+
+ @Test(timeout = 60000)
+ public void testSelectingOnActiveMQMessageID() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ QueueImpl queue = new QueueImpl("queue://txqueue");
+ assertTrue(brokerService.isPersistent());
+
+ Connection connection = createAMQConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("txqueue");
+ MessageProducer p = session.createProducer(destination);
+ TextMessage message = session.createTextMessage();
+ String messageText = "Hello sent at " + new
java.util.Date().toString();
+ message.setText(messageText);
+ p.send(message);
+
+ // Restart broker.
+ restartBroker(connection, session);
+ String selector = "JMSMessageID = '" + message.getJMSMessageID() + "'";
+ LOG.info("Using selector: "+selector);
+ int messagesReceived = readAllMessages(queue, selector);
+ assertEquals(1, messagesReceived);
+ }
+
+ @Test(timeout = 60000)
public void testMessagesAreAckedAMQPProducer() throws Exception {
int messagesSent = 3;
ActiveMQAdmin.enableJMSFrameTracing();
@@ -110,11 +156,20 @@ public class AMQ4563Test extends AmqpTes
}
private int readAllMessages(QueueImpl queue) throws JMSException {
+ return readAllMessages(queue, null);
+ }
+
+ private int readAllMessages(QueueImpl queue, String selector) throws
JMSException {
Connection connection = createAMQPConnection();
try {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
int messagesReceived = 0;
- MessageConsumer consumer = session.createConsumer(queue);
+ MessageConsumer consumer;
+ if( selector==null ) {
+ consumer = session.createConsumer(queue);
+ } else {
+ consumer = session.createConsumer(queue, selector);
+ }
Message msg = consumer.receive(5000);
while(msg != null) {
assertNotNull(msg);
@@ -186,6 +241,7 @@ public class AMQ4563Test extends AmqpTes
brokerService.setPersistenceAdapter(kaha);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
+ brokerService.setStoreOpenWireVersion(10);
openwireUri =
brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
// Setup SSL context...
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1488376&r1=1488375&r2=1488376&view=diff
==============================================================================
---
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
(original)
+++
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
Fri May 31 19:56:12 2013
@@ -83,7 +83,6 @@ public class MessageId implements DataSt
*/
public void setTextView(String key) {
this.textView = key;
- this.key = key;
}
/**
@@ -128,7 +127,11 @@ public class MessageId implements DataSt
public String toString() {
if (key == null) {
if( textView!=null ) {
- key = textView;
+ if( textView.startsWith("ID:") ) {
+ key = textView;
+ } else {
+ key = "ID:"+textView;
+ }
} else {
key = producerId.toString() + ":" + producerSequenceId;
}