Author: nmittler
Date: Mon Jun 12 17:03:00 2006
New Revision: 413770
URL: http://svn.apache.org/viewvc?rev=413770&view=rev
Log:
Implementing AMQ-748 - adding optional request-id header in connect message and
corresponding response-id in the connected message. Updating StompTest to
verify.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java?rev=413770&r1=413769&r2=413770&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
Mon Jun 12 17:03:00 2006
@@ -37,7 +37,7 @@
public CommandEnvelope build(String commandLine, DataInput in) throws
IOException {
- Properties headers = headerParser.parse(in);
+ final Properties headers = headerParser.parse(in);
// allow anyone to login for now
@@ -59,48 +59,66 @@
connectionInfo.setPassword(passcode);
while (in.readByte() != 0) {
- }
+ }
- return new CommandEnvelope(connectionInfo, headers, new
ResponseListener() {
- public boolean onResponse(Response receipt, DataOutput out) throws
IOException {
+ return new CommandEnvelope(connectionInfo, headers,
+ new ConnectResponseListener(headers, connectionInfo) );
+ }
+
+ class ConnectResponseListener implements ResponseListener{
+
+ private Properties headers;
+ private ConnectionInfo connectionInfo;
+
+ public ConnectResponseListener( Properties headers, final
ConnectionInfo connectionInfo ){
+ this.headers = headers;
+ this.connectionInfo = connectionInfo;
+ }
+
+ public boolean onResponse(Response receipt, DataOutput out) throws
IOException {
- if (receipt.getCorrelationId() !=
connectionInfo.getCommandId())
- return false;
-
- final SessionInfo sessionInfo = new
SessionInfo(format.getSessionId());
- sessionInfo.setCommandId(format.generateCommandId());
- sessionInfo.setResponseRequired(false);
-
- final ProducerInfo producerInfo = new
ProducerInfo(format.getProducerId());
- producerInfo.setCommandId(format.generateCommandId());
- producerInfo.setResponseRequired(true);
-
- format.addResponseListener(new ResponseListener() {
- public boolean onResponse(Response receipt, DataOutput
out) throws IOException {
- if (receipt.getCorrelationId() !=
producerInfo.getCommandId())
- return false;
-
- format.onFullyConnected();
-
- StringBuffer buffer = new StringBuffer();
- buffer.append(Stomp.Responses.CONNECTED);
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.Headers.Connected.SESSION);
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(connectionInfo.getClientId());
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.NULL);
- buffer.append(Stomp.NEWLINE);
- out.writeBytes(buffer.toString());
- return true;
- }
- });
+ if (receipt.getCorrelationId() != connectionInfo.getCommandId())
+ return false;
+
+ final SessionInfo sessionInfo = new
SessionInfo(format.getSessionId());
+ sessionInfo.setCommandId(format.generateCommandId());
+ sessionInfo.setResponseRequired(false);
+
+ final ProducerInfo producerInfo = new
ProducerInfo(format.getProducerId());
+ producerInfo.setCommandId(format.generateCommandId());
+ producerInfo.setResponseRequired(true);
+
+ format.addResponseListener( new ResponseListener(){
+ public boolean onResponse(Response receipt, DataOutput out)
throws IOException {
+ if (receipt.getCorrelationId() !=
producerInfo.getCommandId() )
+ return false;
+
+ format.onFullyConnected();
+
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(Stomp.Responses.CONNECTED);
+ buffer.append(Stomp.NEWLINE);
+ buffer.append(Stomp.Headers.Connected.SESSION);
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(connectionInfo.getClientId());
+ if( headers.containsKey(Stomp.Headers.Connect.REQUEST_ID)
){
+ buffer.append(Stomp.NEWLINE);
+ buffer.append(Stomp.Headers.Connected.RESPONSE_ID);
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(headers.getProperty(
Stomp.Headers.Connect.REQUEST_ID ));
+ }
+ buffer.append(Stomp.NEWLINE);
+ buffer.append(Stomp.NEWLINE);
+ buffer.append(Stomp.NULL);
+ buffer.append(Stomp.NEWLINE);
+ out.writeBytes(buffer.toString());
+ return true;
+ }
+ });
- format.addToPendingReadCommands(sessionInfo);
- format.addToPendingReadCommands(producerInfo);
- return true;
- }
- });
+ format.addToPendingReadCommands(sessionInfo);
+ format.addToPendingReadCommands(producerInfo);
+ return true;
+ }
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=413770&r1=413769&r2=413770&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Mon Jun 12 17:03:00 2006
@@ -97,6 +97,7 @@
String LOGIN = "login";
String PASSCODE = "passcode";
String CLIENT_ID = "client-id";
+ String REQUEST_ID = "request-id";
}
public interface Error {
@@ -105,6 +106,7 @@
public interface Connected {
String SESSION = "session";
+ String RESPONSE_ID = "response-id";
}
public interface Ack {
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=413770&r1=413769&r2=413770&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Jun 12 17:03:00 2006
@@ -122,11 +122,12 @@
public void testConnect() throws Exception {
- String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode:
wombats\n" + "\n" + Stomp.NULL;
+ String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode:
wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
sendFrame(connect_frame);
String f = receiveFrame(10000);
assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.contains("response-id:1"));
}