Author: jlim
Date: Wed Jan 18 01:10:49 2006
New Revision: 370081
URL: http://svn.apache.org/viewcvs?rev=370081&view=rev
Log:
added additional test cases for the other stomp verbs/features
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=370081&r1=370080&r2=370081&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Wed Jan 18 01:10:49 2006
@@ -22,17 +22,19 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
+import java.net.SocketTimeoutException;
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+
+import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.stomp.Stomp;
@@ -49,6 +51,7 @@
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
+
connector = broker.addConnector("stomp://localhost:0");
broker.start();
@@ -61,6 +64,8 @@
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue("TEST");
connection.start();
+
+
}
protected void tearDown() throws Exception {
@@ -95,9 +100,17 @@
}
}
}
-
+
+ public void sendMessage(String msg) throws Exception {
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
+
+ }
+
public void testConnect() throws Exception {
-
+
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode:
wombats\n" + "\n" + Stomp.NULL;
sendFrame(connect_frame);
@@ -119,46 +132,252 @@
frame = receiveFrame(10000);
assertTrue(frame.startsWith("CONNECTED"));
-
- frame =
- "SEND\n" +
- "destination:/queue/TEST\n\n" +
- "Hello World" +
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/TEST\n\n" +
+ "Hello World" +
Stomp.NULL;
+
sendFrame(frame);
-
+
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("Hello World", message.getText());
-
+
+
}
-
+
public void testSubscribeWithAutoAck() throws Exception {
-
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
sendFrame(frame);
-
- frame = receiveFrame(10000000);
+
+ frame = receiveFrame(100000);
assertTrue(frame.startsWith("CONNECTED"));
-
- frame =
- "SUBSCRIBE\n" +
+
+ frame =
+ "SUBSCRIBE\n" +
"destination:/queue/TEST\n" +
- "ack:auto\n\n" +
+ "ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
- MessageProducer producer = session.createProducer(queue);
- TextMessage message = session.createTextMessage(getName());
- producer.send(message);
-
+ sendMessage(getName());
+
frame = receiveFrame(10000);
assertTrue(frame.startsWith("MESSAGE"));
-
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+
}
-
+
+
+ public void testSubscribeWithClientAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ assertTrue(frame.startsWith("CONNECTED"));
+
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/TEST\n" +
+ "ack:client\n\n"+
+ Stomp.NULL;
+
+
+ sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ assertNotNull(message);
+ assertTrue(message.getJMSRedelivered());
+
+
+
+ }
+
+ public void testUnsubscribe() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/TEST\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage(getName());
+
+
+ //receive message from socket
+ frame = receiveFrame(10000);
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/queue/TEST\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage(getName());
+
+
+ try {
+ frame = receiveFrame(1000);
+ fail("No message should have been received since subscription was
removed");
+ }catch (SocketTimeoutException e){
+
+ }
+
+ }
+
+
+ public void testTransactionCommit() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/TEST\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // This test case is currently failing
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ assertNotNull(message);
+
+
+ }
+
+ public void testTransactionRollback() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n"+
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/TEST\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //rollback first message
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/TEST\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // This test case is currently failing
+
+ //only second msg should be received since first msg was rolled back
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals("second message", message.getText());
+
+
+ }
+
+
+
+
}