Author: jstrachan
Date: Fri Dec 30 02:24:55 2005
New Revision: 360062
URL: http://svn.apache.org/viewcvs?rev=360062&view=rev
Log:
added test case to show the configuration of prefetch sizes in Stomp using
header "activemq.prefetchSize: 1"
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java?rev=360062&r1=360061&r2=360062&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
Fri Dec 30 02:24:55 2005
@@ -43,7 +43,7 @@
ci.setPrefetchSize(1000);
ci.setDispatchAsync(true);
- IntrospectionSupport.setProperties(ci, headers, "activemq:");
+ IntrospectionSupport.setProperties(ci, headers, "activemq.");
ci.setDestination(DestinationNamer.convert(destination));
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java?rev=360062&r1=360061&r2=360062&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
Fri Dec 30 02:24:55 2005
@@ -17,16 +17,18 @@
package org.apache.activemq.transport.stomp;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.*;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompWireFormat;
+import javax.jms.JMSException;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
import junit.framework.TestCase;
@@ -37,33 +39,45 @@
wire = new StompWireFormat();
}
- public void testDummy() throws Exception {
- }
-
- public void TODO_testValidConnectHandshake() throws Exception {
- String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode:
wombats\n" + "\n" + Stomp.NULL;
- DataInputStream din = new DataInputStream(new
ByteArrayInputStream(connect_frame.getBytes()));
+ public void testValidConnectHandshake() throws Exception {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
- ConnectionInfo ci = (ConnectionInfo) wire.readCommand(din);
+ ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" +
"login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL);
assertNotNull(ci);
assertTrue(ci.isResponseRequired());
Response cr = new Response();
cr.setCorrelationId(ci.getCommandId());
- wire.writeCommand(cr, dout);
+
+ String response = writeCommand(cr);
+ System.out.println("Received: " + response);
SessionInfo si = (SessionInfo) wire.readCommand(null);
assertNotNull(si);
- assertTrue(si.isResponseRequired());
+ assertTrue(!si.isResponseRequired());
- Response sr = new Response();
- sr.setCorrelationId(si.getCommandId());
- wire.writeCommand(sr, dout);
+ ProducerInfo pi = (ProducerInfo) wire.readCommand(null);
+ assertNotNull(pi);
+ assertTrue(pi.isResponseRequired());
- String response = new String(bout.toByteArray());
- assertTrue(response.startsWith("CONNECTED"));
+ Response sr = new Response();
+ sr.setCorrelationId(pi.getCommandId());
+ response = writeCommand(sr);
+ System.out.println("Received: " + response);
+ assertTrue("Response should start with CONNECTED: " + response,
response.startsWith("CONNECTED"));
+
+ // now lets test subscribe
+ ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n"
+ "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n"
+ + "\n" + Stomp.NULL);
+ assertNotNull(consumerInfo);
+ // assertTrue(consumerInfo.isResponseRequired());
+ assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize());
+
+ cr = new Response();
+ cr.setCorrelationId(consumerInfo.getCommandId());
+ response = writeCommand(cr);
+ System.out.println("Received: " + response);
}
public void _testFakeServer() throws Exception {
@@ -85,4 +99,18 @@
System.err.println(System.in.read());
}
+
+ protected Command parseCommand(String connect_frame) throws IOException,
JMSException {
+ DataInputStream din = new DataInputStream(new
ByteArrayInputStream(connect_frame.getBytes()));
+
+ return wire.readCommand(din);
+ }
+
+ protected String writeCommand(Command command) throws IOException,
JMSException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream dout = new DataOutputStream(bout);
+ wire.writeCommand(command, dout);
+ return new String(bout.toByteArray());
+ }
+
}