Author: jstrachan
Date: Tue Feb 14 04:55:35 2006
New Revision: 377713
URL: http://svn.apache.org/viewcvs?rev=377713&view=rev
Log:
added test case and support for selectors on Stomp subscriptions
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=377713&r1=377712&r2=377713&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Tue Feb 14 04:55:35 2006
@@ -79,6 +79,7 @@
String DESTINATION = "destination";
String ACK_MODE = "ack";
String ID = "id";
+ String SELECTOR = "selector";
public interface AckModeValues {
String AUTO = "auto";
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java?rev=377713&r1=377712&r2=377713&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
Tue Feb 14 04:55:35 2006
@@ -16,14 +16,14 @@
*/
package org.apache.activemq.transport.stomp;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Properties;
-
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.util.IntrospectionSupport;
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Properties;
+
class Subscribe implements StompCommand {
private HeaderParser headerParser = new HeaderParser();
private StompWireFormat format;
@@ -43,6 +43,9 @@
ci.setPrefetchSize(1000);
ci.setDispatchAsync(true);
+ String selector = (String)
headers.remove(Stomp.Headers.Subscribe.SELECTOR);
+ ci.setSelector(selector);
+
IntrospectionSupport.setProperties(ci, headers, "activemq.");
ci.setDestination(DestinationNamer.convert(destination));
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=377713&r1=377712&r2=377713&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Tue Feb 14 04:55:35 2006
@@ -23,6 +23,7 @@
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -103,12 +104,16 @@
}
}
+
public void sendMessage(String msg) throws Exception {
+ sendMessage(msg, "foo", "xyz");
+ }
+ public void sendMessage(String msg, String propertyName, String
propertyValue) throws JMSException {
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage(msg);
+ message.setStringProperty(propertyName, propertyValue);
producer.send(message);
-
}
public void testConnect() throws Exception {
@@ -177,8 +182,40 @@
assertNotNull(message);
assertEquals("Hello World", message.getText());
}
-
+
public void testSubscribeWithAutoAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception {
String frame =
"CONNECT\n" +
@@ -193,22 +230,23 @@
frame =
"SUBSCRIBE\n" +
"destination:/queue/" + getQueueName() + "\n" +
+ "selector: foo = 'zzz'\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
- sendMessage(getName());
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
frame = receiveFrame(10000);
assertTrue(frame.startsWith("MESSAGE"));
+ assertTrue("Should have received the real message but got: " + frame,
frame.indexOf("Real message") > 0);
frame =
"DISCONNECT\n" +
"\n\n"+
Stomp.NULL;
sendFrame(frame);
-
-
}