Author: nmittler
Date: Mon Jun 12 17:03:00 2006
New Revision: 413770

URL: http://svn.apache.org/viewvc?rev=413770&view=rev
Log:
Implementing AMQ-748 - adding optional request-id header in connect message and 
corresponding response-id in the connected message.  Updating StompTest to 
verify.

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java?rev=413770&r1=413769&r2=413770&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
 Mon Jun 12 17:03:00 2006
@@ -37,7 +37,7 @@
 
     public CommandEnvelope build(String commandLine, DataInput in) throws 
IOException {
         
-        Properties headers = headerParser.parse(in);
+        final Properties headers = headerParser.parse(in);
         
         
         // allow anyone to login for now
@@ -59,48 +59,66 @@
         connectionInfo.setPassword(passcode);
 
         while (in.readByte() != 0) {
-        }
+        }               
         
-        return new CommandEnvelope(connectionInfo, headers, new 
ResponseListener() {
-            public boolean onResponse(Response receipt, DataOutput out) throws 
IOException {
+        return new CommandEnvelope(connectionInfo, headers, 
+                       new ConnectResponseListener(headers, connectionInfo) );
+    }
+    
+    class ConnectResponseListener implements ResponseListener{
+       
+       private Properties headers;
+       private ConnectionInfo connectionInfo;
+       
+       public ConnectResponseListener( Properties headers, final 
ConnectionInfo connectionInfo ){
+               this.headers = headers;
+               this.connectionInfo = connectionInfo;
+       }
+       
+       public boolean onResponse(Response receipt, DataOutput out) throws 
IOException {
                 
-                if (receipt.getCorrelationId() != 
connectionInfo.getCommandId())
-                    return false;
-                
-                final SessionInfo sessionInfo = new 
SessionInfo(format.getSessionId());
-                sessionInfo.setCommandId(format.generateCommandId());
-                sessionInfo.setResponseRequired(false);
-                
-                final ProducerInfo producerInfo = new 
ProducerInfo(format.getProducerId());
-                producerInfo.setCommandId(format.generateCommandId());
-                producerInfo.setResponseRequired(true);
-                
-                format.addResponseListener(new ResponseListener() {
-                    public boolean onResponse(Response receipt, DataOutput 
out) throws IOException {
-                        if (receipt.getCorrelationId() != 
producerInfo.getCommandId())
-                            return false;
-                        
-                        format.onFullyConnected();
-                        
-                        StringBuffer buffer = new StringBuffer();
-                        buffer.append(Stomp.Responses.CONNECTED);
-                        buffer.append(Stomp.NEWLINE);
-                        buffer.append(Stomp.Headers.Connected.SESSION);
-                        buffer.append(Stomp.Headers.SEPERATOR);
-                        buffer.append(connectionInfo.getClientId());
-                        buffer.append(Stomp.NEWLINE);
-                        buffer.append(Stomp.NEWLINE);
-                        buffer.append(Stomp.NULL);
-                        buffer.append(Stomp.NEWLINE);
-                        out.writeBytes(buffer.toString());
-                        return true;
-                    }
-                });
+            if (receipt.getCorrelationId() != connectionInfo.getCommandId())
+                return false;
+            
+            final SessionInfo sessionInfo = new 
SessionInfo(format.getSessionId());
+            sessionInfo.setCommandId(format.generateCommandId());
+            sessionInfo.setResponseRequired(false);
+            
+            final ProducerInfo producerInfo = new 
ProducerInfo(format.getProducerId());
+            producerInfo.setCommandId(format.generateCommandId());
+            producerInfo.setResponseRequired(true);                            
                  
+            
+            format.addResponseListener( new ResponseListener(){
+               public boolean onResponse(Response receipt, DataOutput out) 
throws IOException {                
+                    if (receipt.getCorrelationId() != 
producerInfo.getCommandId() )
+                        return false;
+                    
+                    format.onFullyConnected();
+                    
+                    StringBuffer buffer = new StringBuffer();
+                    buffer.append(Stomp.Responses.CONNECTED);
+                    buffer.append(Stomp.NEWLINE);
+                    buffer.append(Stomp.Headers.Connected.SESSION);            
+                    buffer.append(Stomp.Headers.SEPERATOR);
+                    buffer.append(connectionInfo.getClientId());               
     
+                    if( headers.containsKey(Stomp.Headers.Connect.REQUEST_ID) 
){
+                       buffer.append(Stomp.NEWLINE);
+                       buffer.append(Stomp.Headers.Connected.RESPONSE_ID);
+                       buffer.append(Stomp.Headers.SEPERATOR);
+                       buffer.append(headers.getProperty( 
Stomp.Headers.Connect.REQUEST_ID ));
+                       }
+                    buffer.append(Stomp.NEWLINE);
+                    buffer.append(Stomp.NEWLINE);
+                    buffer.append(Stomp.NULL);
+                    buffer.append(Stomp.NEWLINE);
+                    out.writeBytes(buffer.toString());
+                    return true;
+                }
+            });                                
 
-                format.addToPendingReadCommands(sessionInfo);
-                format.addToPendingReadCommands(producerInfo);
-                return true;
-            }
-        });
+            format.addToPendingReadCommands(sessionInfo);
+            format.addToPendingReadCommands(producerInfo);
+            return true;
+               }
     }
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=413770&r1=413769&r2=413770&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
 Mon Jun 12 17:03:00 2006
@@ -97,6 +97,7 @@
             String LOGIN = "login";
             String PASSCODE = "passcode";
             String CLIENT_ID = "client-id";
+            String REQUEST_ID = "request-id";
         }
 
         public interface Error {
@@ -105,6 +106,7 @@
 
         public interface Connected {
             String SESSION = "session";
+            String RESPONSE_ID = "response-id";
         }
 
         public interface Ack {

Modified: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=413770&r1=413769&r2=413770&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 Mon Jun 12 17:03:00 2006
@@ -122,11 +122,12 @@
 
     public void testConnect() throws Exception {
           
-        String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: 
wombats\n" + "\n" + Stomp.NULL;
+        String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: 
wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
         sendFrame(connect_frame);
      
         String f = receiveFrame(10000);
         assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.contains("response-id:1"));
         
     }
     


Reply via email to