Author: chirino
Date: Sat Jul  1 21:18:44 2006
New Revision: 418550

URL: http://svn.apache.org/viewvc?rev=418550&view=rev
Log:
Added a new/highly refactored version of the STOMP protocol implementation.

The biggest difference between this and previous implementation is that 
conversion between the STOMP protocol and 
the ActiveMQ protocol happens at a Transport Filter layer instead of doing it 
all at the WireFormat layer.

I think this has resulted in cleaner code since there's a better seperating 
between marshalling/unmarshalling of 
the STOMP protocol and converting the stomp protocol to the activemq protocol.


Added:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
Modified:
    
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
    
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,627 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.ProtocolException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com";>chirino</a> 
+ */
+public class ProtocolConverter {
+       
+    private static final IdGenerator connectionIdGenerator = new IdGenerator();
+    private final ConnectionId connectionId = new 
ConnectionId(connectionIdGenerator.generateId());
+    private final SessionId sessionId = new SessionId(connectionId, -1);
+    private final ProducerId producerId = new ProducerId(sessionId, 1);
+    
+    private final LongSequenceGenerator consumerIdGenerator = new 
LongSequenceGenerator();
+    private final LongSequenceGenerator messageIdGenerator = new 
LongSequenceGenerator();
+    private final LongSequenceGenerator transactionIdGenerator = new 
LongSequenceGenerator();
+       
+    private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
+    private final ConcurrentHashMap subscriptionsByConsumerId = new 
ConcurrentHashMap();
+    private final Map transactions = new ConcurrentHashMap();
+       private StompTransportFilter transportFilter;
+       
+       private final Object commnadIdMutex = new Object();
+       private int lastCommandId;
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+
+    protected int generateCommandId() {
+       synchronized(commnadIdMutex){
+               return lastCommandId++;
+       }
+    }
+
+    protected ResponseHandler createResponseHandler(StompCommand command){
+        final String receiptId = (String) 
command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        // A response may not be needed.
+        if( receiptId != null ) {
+               return new ResponseHandler() {
+                       public void onResponse(ProtocolConverter converter, 
Response response) throws IOException {
+                       StompCommand sc = new StompCommand();
+                       sc.setHeaders(new HashMap(5));
+                       sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, 
receiptId);
+                               transportFilter.sendToStomp(sc);
+                       }
+               };
+           }
+       return null;
+    }
+    
+       protected void sendToActiveMQ(Command command, ResponseHandler handler) 
{
+               command.setCommandId(generateCommandId());
+               if(handler!=null) {
+                       command.setResponseRequired(true);
+                       resposeHandlers.put(new 
Integer(command.getCommandId()), handler);
+               }
+               transportFilter.sendToActiveMQ(command);
+       }
+
+       protected void sendToStomp(StompCommand command) throws IOException {
+               transportFilter.sendToStomp(command);
+       }
+
+       /**
+     * Convert a stomp command
+     * @param command
+     */
+       public void onStompCommad( StompCommand command ) throws IOException, 
JMSException {
+               try {
+                       
+                       String action = command.getAction();
+               if (action.startsWith(Stomp.Commands.SEND))
+                   onStompSend(command);
+               else if (action.startsWith(Stomp.Commands.ACK))
+                   onStompAck(command);
+               else if (action.startsWith(Stomp.Commands.BEGIN))
+                   onStompBegin(command);
+               else if (action.startsWith(Stomp.Commands.COMMIT))
+                   onStompCommit(command);
+               else if (action.startsWith(Stomp.Commands.ABORT))
+                   onStompAbort(command);
+               else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
+                   onStompSubscribe(command);
+               else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
+                   onStompUnsubscribe(command);
+                       else if (action.startsWith(Stomp.Commands.CONNECT))
+                   onStompConnect(command);
+               else if (action.startsWith(Stomp.Commands.DISCONNECT))
+                   onStompDisconnect(command);
+               else
+                       throw new ProtocolException("Unknown STOMP action: 
"+action);
+               
+        } catch (ProtocolException e) {
+               
+               // Let the stomp client know about any protocol errors.
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               PrintWriter stream = new PrintWriter(new 
OutputStreamWriter(baos,"UTF-8"));
+               e.printStackTrace(stream);
+               stream.close();
+
+               HashMap headers = new HashMap();
+               headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+               
+            final String receiptId = (String) 
command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+            if( receiptId != null ) {                  
+               headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            }
+               
+               StompCommand errorMessage = new 
StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray());
+                       sendToStomp(errorMessage);
+                       
+        }
+       }
+       
+       protected void onStompSend(StompCommand command) throws IOException, 
JMSException {
+               checkConnected();
+
+       Map headers = command.getHeaders();
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+
+        ActiveMQMessage message = convertMessage(command);
+
+        message.setProducerId(producerId);
+        MessageId id = new MessageId(producerId, 
messageIdGenerator.getNextSequenceId());
+        message.setMessageId(id);
+        message.setJMSTimestamp(System.currentTimeMillis());
+
+        if (stompTx!=null) {
+               TransactionId activemqTx = (TransactionId) 
transactions.get(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: 
"+stompTx);
+            message.setTransactionId(activemqTx);
+        }
+               
+        message.onSend();
+               sendToActiveMQ(message, createResponseHandler(command));
+               
+       }
+       
+
+    protected void onStompAck(StompCommand command) throws ProtocolException {
+               checkConnected();
+
+       // TODO: acking with just a message id is very bogus
+       // since the same message id could have been sent to 2 different 
subscriptions
+       // on the same stomp connection. For example, when 2 subs are created 
on the same topic.
+       
+       Map headers = command.getHeaders();
+        String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+        if (messageId == null)
+            throw new ProtocolException("ACK received without a message-id to 
acknowledge!");
+
+        TransactionId activemqTx=null;
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx!=null) {
+               activemqTx = (TransactionId) transactions.get(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: 
"+stompTx);
+        }
+
+        boolean acked=false;
+        for (Iterator iter = subscriptionsByConsumerId.values().iterator(); 
iter.hasNext();) {
+                       StompSubscription sub = (StompSubscription) iter.next();
+                       MessageAck ack = sub.onStompMessageAck(messageId);
+                       if( ack!=null ) {
+                       ack.setTransactionId(activemqTx);
+                       sendToActiveMQ(ack,createResponseHandler(command));
+                       acked=true;
+                       break;
+                       }
+               }
+        
+        if( !acked )
+               throw new ProtocolException("Unexpected ACK received for 
message-id [" + messageId + "]");
+
+       }
+    
+
+       protected void onStompBegin(StompCommand command) throws 
ProtocolException {
+               checkConnected();
+
+               Map headers = command.getHeaders();
+               
+        String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
+        
+        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
+            throw new ProtocolException("Must specify the transaction you are 
beginning");
+        }
+        
+        if( transactions.get(stompTx)!=null  ) {
+            throw new ProtocolException("The transaction was allready started: 
"+stompTx);
+        }
+        
+        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, 
transactionIdGenerator.getNextSequenceId());
+        transactions.put(stompTx, activemqTx);
+        
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.BEGIN);
+        
+               sendToActiveMQ(tx, createResponseHandler(command));
+               
+       }
+       
+       protected void onStompCommit(StompCommand command) throws 
ProtocolException {
+               checkConnected();
+
+               Map headers = command.getHeaders();
+               
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx==null) {
+            throw new ProtocolException("Must specify the transaction you are 
committing");
+        }
+        
+        TransactionId activemqTx=null;
+        if (stompTx!=null) {
+               activemqTx = (TransactionId) transactions.remove(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: 
"+stompTx);
+        }
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
+               
+               sendToActiveMQ(tx, createResponseHandler(command));
+       }
+
+       protected void onStompAbort(StompCommand command) throws 
ProtocolException {
+               checkConnected();
+       Map headers = command.getHeaders();
+               
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx==null) {
+            throw new ProtocolException("Must specify the transaction you are 
committing");
+        }
+        
+        TransactionId activemqTx=null;
+        if (stompTx!=null) {
+               activemqTx = (TransactionId) transactions.remove(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: 
"+stompTx);
+        }
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.ROLLBACK);
+               
+               sendToActiveMQ(tx, createResponseHandler(command));
+               
+       }
+
+       protected void onStompSubscribe(StompCommand command) throws 
ProtocolException {
+               checkConnected();
+       Map headers = command.getHeaders();
+        
+        String subscriptionId = 
(String)headers.get(Stomp.Headers.Subscribe.ID);
+        String destination = 
(String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
+        
+        ActiveMQDestination actual_dest = convertDestination(destination);
+        ConsumerId id = new ConsumerId(sessionId, 
consumerIdGenerator.getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setPrefetchSize(1000);
+        consumerInfo.setDispatchAsync(true);
+
+        String selector = (String) 
headers.remove(Stomp.Headers.Subscribe.SELECTOR);
+        consumerInfo.setSelector(selector);
+        
+        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
+        
+        consumerInfo.setDestination(convertDestination(destination));
+                
+        StompSubscription stompSubscription = new StompSubscription(this, 
subscriptionId, consumerInfo);
+        stompSubscription.setDestination(actual_dest);
+        
+        String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
+            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
+        } else {
+            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
+        }
+
+        subscriptionsByConsumerId.put(id, stompSubscription);
+               sendToActiveMQ(consumerInfo, createResponseHandler(command));
+               
+       }
+
+       protected void onStompUnsubscribe(StompCommand command) throws 
ProtocolException {
+               checkConnected();
+       Map headers = command.getHeaders();
+
+        ActiveMQDestination destination=null;
+        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+        if( o!=null ) 
+               destination =convertDestination((String) o);
+        
+        String subscriptionId = 
(String)headers.get(Stomp.Headers.Unsubscribe.ID);
+        
+        if (subscriptionId==null && destination==null) {
+            throw new ProtocolException("Must specify the subscriptionId or 
the destination you are unsubscribing from");
+        }
+
+        // TODO: Unsubscribing using a destination is a bit wierd if multiple 
subscriptions
+        // are created with the same destination.  Perhaps this should be 
removed.
+        //
+        for (Iterator iter = subscriptionsByConsumerId.values().iterator(); 
iter.hasNext();) {
+                       StompSubscription sub = (StompSubscription) iter.next();
+                       if ( 
+                               (subscriptionId!=null && 
subscriptionId.equals(sub.getSubscriptionId()) ) ||
+                               (destination!=null && 
destination.equals(sub.getDestination()) )
+                       ) {
+                       
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), 
createResponseHandler(command));
+                               return;
+                       }
+               }
+        
+        throw new ProtocolException("No subscription matched.");
+       }
+
+       protected void onStompConnect(StompCommand command) throws 
ProtocolException {
+
+               if(connected.get()) {
+                       throw new ProtocolException("Allready connected.");
+               }
+
+       final Map headers = command.getHeaders();
+        
+        // allow anyone to login for now
+        String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+        String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+        String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+        
+        final ConnectionInfo connectionInfo = new ConnectionInfo();
+        
+        IntrospectionSupport.setProperties(connectionInfo, headers, 
"activemq.");
+        
+        connectionInfo.setConnectionId(connectionId);
+        if( clientId!=null )
+            connectionInfo.setClientId(clientId);
+        else
+            
connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
+        
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setUserName(login);
+        connectionInfo.setPassword(passcode);
+
+               sendToActiveMQ(connectionInfo, new ResponseHandler(){
+                       public void onResponse(ProtocolConverter converter, 
Response response) throws IOException {
+                                                   
+                   final SessionInfo sessionInfo = new SessionInfo(sessionId);
+                   sendToActiveMQ(sessionInfo,null);
+                   
+                   
+                   final ProducerInfo producerInfo = new 
ProducerInfo(producerId);
+                   sendToActiveMQ(producerInfo,new ResponseHandler(){
+                                       public void 
onResponse(ProtocolConverter converter, Response response) throws IOException {
+                                               
+                                               connected.set(true);
+                           HashMap responseHeaders = new HashMap();
+                           
+                           
responseHeaders.put(Stomp.Headers.Connected.SESSION, 
connectionInfo.getClientId());
+                           String requestId = (String) 
headers.get(Stomp.Headers.Connect.REQUEST_ID);
+                           if( requestId !=null ){
+                                   
responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
+                               }
+                           
+                           StompCommand sc = new StompCommand();
+                           sc.setAction(Stomp.Responses.CONNECTED);
+                           sc.setHeaders(responseHeaders);
+                           sendToStomp(sc);
+                                       }
+                               });
+                   
+                       }
+               });
+               
+       }
+
+       protected void onStompDisconnect(StompCommand command) throws 
ProtocolException {
+               checkConnected();
+               sendToActiveMQ(new ShutdownInfo(), 
createResponseHandler(command));
+               connected.set(false);
+       }
+
+
+       protected void checkConnected() throws ProtocolException {
+               if(!connected.get()) {
+                       throw new ProtocolException("Not connected.");
+               }
+       }
+
+       /**
+     * Convert a ActiveMQ command
+     * @param command
+     * @throws IOException 
+     */
+       public void onActiveMQCommad( Command command ) throws IOException, 
JMSException {
+               
+       if ( command.isResponse() ) {
+                   
+                       Response response = (Response) command;
+                   ResponseHandler rh = (ResponseHandler) 
resposeHandlers.remove(new Integer(response.getCorrelationId()));
+                   if( rh !=null ) {
+                       rh.onResponse(this, response);
+                   }
+                   
+               } else if( command.isMessageDispatch() ) {
+                       
+                   MessageDispatch md = (MessageDispatch)command;
+                   StompSubscription sub = (StompSubscription) 
subscriptionsByConsumerId.get(md.getConsumerId());
+                   if (sub != null)
+                       sub.onMessageDispatch(md);
+                   
+               }
+               
+       }
+
+       public  ActiveMQMessage convertMessage(StompCommand command) throws 
IOException, JMSException {
+               Map headers = command.getHeaders();
+        
+        // now the body
+        ActiveMQMessage msg;
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+            headers.remove(Stomp.Headers.CONTENT_LENGTH);
+            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+            bm.writeBytes(command.getContent());
+            msg = bm;
+        } else {
+            ActiveMQTextMessage text = new ActiveMQTextMessage();
+            try {
+                               text.setText(new String(command.getContent(), 
"UTF-8"));
+                       } catch (Throwable e) {
+                               throw (ProtocolException)new 
ProtocolException("Text could not bet set: "+e).initCause(e);
+                       }
+            msg = text;
+        }
+
+        String destination = (String) 
headers.remove(Stomp.Headers.Send.DESTINATION);
+        msg.setDestination(convertDestination(destination));
+
+        // the standard JMS headers
+        msg.setJMSCorrelationID((String) 
headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+        Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+        if (o != null) {
+            msg.setJMSExpiration(Long.parseLong((String) o));
+        }
+        
+        o = headers.remove(Stomp.Headers.Send.PRIORITY);
+        if (o != null) {
+            msg.setJMSPriority(Integer.parseInt((String)o));
+        }
+        
+        o = headers.remove(Stomp.Headers.Send.TYPE);
+        if (o != null) {
+            msg.setJMSType((String) o);
+        }
+        
+        o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+        if( o!=null ) {
+               msg.setJMSReplyTo(convertDestination((String)o));
+        }
+
+        o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+        if (o != null) {
+            msg.setPersistent("true".equals(o));
+        }
+        
+        // now the general headers
+        msg.setProperties(headers);
+        
+        return msg;        
+       }
+       
+       public StompCommand convertMessage(ActiveMQMessage message) throws 
IOException, JMSException {
+
+               StompCommand command = new StompCommand();
+               command.setAction(Stomp.Responses.MESSAGE);
+               
+               HashMap headers = new HashMap();
+               command.setHeaders(headers);
+               
+        headers.put(Stomp.Headers.Message.DESTINATION, 
convertDestination(message.getDestination()));
+        headers.put(Stomp.Headers.Message.MESSAGE_ID, 
message.getJMSMessageID());
+        headers.put(Stomp.Headers.Message.CORRELATION_ID, 
message.getJMSCorrelationID());
+        headers.put(Stomp.Headers.Message.EXPIRATION_TIME, 
""+message.getJMSExpiration());
+        if (message.getJMSRedelivered()) {
+            headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+        }
+        headers.put(Stomp.Headers.Message.PRORITY, 
""+message.getJMSPriority());
+        headers.put(Stomp.Headers.Message.REPLY_TO, 
convertDestination(message.getJMSReplyTo()));
+        headers.put(Stomp.Headers.Message.TIMESTAMP, 
""+message.getJMSTimestamp());
+        headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+
+        // now lets add all the message headers
+        Map properties = message.getProperties();
+        if (properties != null) {
+            headers.putAll(properties);
+        }
+        
+        if( message.getDataStructureType() == 
ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+               
+            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+            command.setContent(msg.getText().getBytes("UTF-8"));
+            
+        } else if( message.getDataStructureType() == 
ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
+            
+               ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+            byte[] data = new byte[(int)msg.getBodyLength()]; 
+            msg.readBytes(data);
+
+            headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
+            command.setContent(data);
+            
+        }
+
+        return command;                
+       }
+       
+    protected ActiveMQDestination convertDestination(String name) throws 
ProtocolException {
+        if (name == null) {
+            return null;
+        }
+        else if (name.startsWith("/queue/")) {
+            String q_name = name.substring("/queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(q_name, 
ActiveMQDestination.QUEUE_TYPE);
+        }
+        else if (name.startsWith("/topic/")) {
+            String t_name = name.substring("/topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(t_name, 
ActiveMQDestination.TOPIC_TYPE);
+        }
+        else {
+            throw new ProtocolException("Illegal destination name: [" + name + 
"] -- ActiveMQ STOMP destinations " + "must begine with /queue/ or /topic/");
+        }
+
+    }
+
+    protected String convertDestination(Destination d) {
+        if (d == null) {
+            return null;
+        }
+        ActiveMQDestination amq_d = (ActiveMQDestination) d;
+        String p_name = amq_d.getPhysicalName();
+
+        StringBuffer buffer = new StringBuffer();
+        if (amq_d.isQueue()) {
+            buffer.append("/queue/");
+        }
+        if (amq_d.isTopic()) {
+            buffer.append("/topic/");
+        }
+        buffer.append(p_name);
+
+        return buffer.toString();
+    }
+
+       public StompTransportFilter getTransportFilter() {
+               return transportFilter;
+       }
+
+       public void setTransportFilter(StompTransportFilter transportFilter) {
+               this.transportFilter = transportFilter;
+       }
+       
+       public void onStompExcepton(IOException error) {
+               // TODO Auto-generated method stub
+       }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.Response;
+
+/**
+ * Interface used by the ProtocolConverter for callbacks.
+ * 
+ * @author <a href="http://hiramchirino.com";>chirino</a> 
+ */
+interface ResponseHandler {
+    void onResponse(ProtocolConverter converter, Response response) throws 
IOException;
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,149 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents all the data in a STOMP frame.
+ * 
+ * @author <a href="http://hiramchirino.com";>chirino</a> 
+ */
+public class StompCommand implements Command {
+
+    private static final byte[] NO_DATA = new byte[]{};
+
+       private String action;
+       private Map headers = Collections.EMPTY_MAP;
+       private byte[] content = NO_DATA;
+
+       public StompCommand(String command, HashMap headers, byte[] data) {
+               this.action = command;
+               this.headers = headers;
+               this.content = data;
+       }
+
+       public StompCommand() {
+       }
+
+       public String getAction() {
+               return action;
+       }
+
+       public void setAction(String command) {
+               this.action = command;
+       }
+
+       public byte[] getContent() {
+               return content;
+       }
+
+       public void setContent(byte[] data) {
+               this.content = data;
+       }
+
+       public Map getHeaders() {
+               return headers;
+       }
+
+       public void setHeaders(Map headers) {
+               this.headers = headers;
+       }
+
+       //
+       // Methods in the Command interface
+       //
+       public int getCommandId() {
+               return 0;
+       }
+
+       public Endpoint getFrom() {
+               return null;
+       }
+
+       public Endpoint getTo() {
+               return null;
+       }
+
+       public boolean isBrokerInfo() {
+               return false;
+       }
+
+       public boolean isMessage() {
+               return false;
+       }
+
+       public boolean isMessageAck() {
+               return false;
+       }
+
+       public boolean isMessageDispatch() {
+               return false;
+       }
+
+       public boolean isMessageDispatchNotification() {
+               return false;
+       }
+
+       public boolean isResponse() {
+               return false;
+       }
+
+       public boolean isResponseRequired() {
+               return false;
+       }
+
+       public boolean isShutdownInfo() {
+               return false;
+       }
+
+       public boolean isWireFormatInfo() {
+               return false;
+       }
+
+       public void setCommandId(int value) {
+       }
+
+       public void setFrom(Endpoint from) {
+       }
+
+       public void setResponseRequired(boolean responseRequired) {
+       }
+
+       public void setTo(Endpoint to) {
+       }
+
+       public Response visit(CommandVisitor visitor) throws Exception {
+               return null;
+       }
+
+       public byte getDataStructureType() {
+               return 0;
+       }
+
+       public boolean isMarshallAware() {
+               return false;
+       }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,136 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transport.stomp.Stomp;
+
+/**
+ * Keeps track of the STOMP susbscription so that acking is correctly done.
+ *  
+ * @author <a href="http://hiramchirino.com";>chirino</a> 
+ */
+public class StompSubscription {
+    
+    public static final String AUTO_ACK = 
Stomp.Headers.Subscribe.AckModeValues.AUTO;
+    public static final String CLIENT_ACK = 
Stomp.Headers.Subscribe.AckModeValues.CLIENT;
+
+       private final ProtocolConverter protocolConverter;
+    private final String subscriptionId;
+    private final ConsumerInfo consumerInfo;
+    
+    private final LinkedHashMap dispatchedMessage = new LinkedHashMap();
+    
+    private String ackMode = AUTO_ACK;
+       private ActiveMQDestination destination;
+
+    
+    public StompSubscription(ProtocolConverter stompTransport, String 
subscriptionId, ConsumerInfo consumerInfo) {
+        this.protocolConverter = stompTransport;
+               this.subscriptionId = subscriptionId;
+        this.consumerInfo = consumerInfo;
+    }
+
+    void onMessageDispatch(MessageDispatch md) throws IOException, 
JMSException {
+
+       ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
+       
+        if (ackMode == CLIENT_ACK) {
+            synchronized (this) {
+               dispatchedMessage.put(message.getJMSMessageID(), 
message.getMessageId());
+            }
+        } else if (ackMode == AUTO_ACK) {
+            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 
1);
+            protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+        }
+        
+        StompCommand command = protocolConverter.convertMessage(message);
+        
+        command.setAction(Stomp.Responses.MESSAGE);        
+        if (subscriptionId!=null) {
+            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, 
subscriptionId);
+        }
+        
+        protocolConverter.getTransportFilter().sendToStomp(command);
+    }
+    
+    synchronized MessageAck onStompMessageAck(String messageId) {
+       
+               if( !dispatchedMessage.containsKey(messageId) ) {
+                       return null;
+               }
+       
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        
+        int count=0;
+        for (Iterator iter = dispatchedMessage.keySet().iterator(); 
iter.hasNext();) {
+            
+            String id = (String) iter.next();
+            if( ack.getFirstMessageId()==null )
+                ack.setFirstMessageId((MessageId) dispatchedMessage.get(id));
+
+            iter.remove();
+            count++;
+            if( id.equals(messageId)  ) {
+                ack.setLastMessageId((MessageId) dispatchedMessage.get(id));
+                break;
+            }
+        }
+        
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+       public String getAckMode() {
+               return ackMode;
+       }
+
+       public void setAckMode(String ackMode) {
+               this.ackMode = ackMode;
+       }
+
+       public String getSubscriptionId() {
+               return subscriptionId;
+       }
+
+       public void setDestination(ActiveMQDestination destination) {
+               this.destination = destination;
+       }
+
+       public ActiveMQDestination getDestination() {
+               return destination;
+       }
+
+       public ConsumerInfo getConsumerInfo() {
+               return consumerInfo;
+       }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.util.Map;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+
+/**
+ * A <a href="http://stomp.codehaus.org/";>STOMP</a> transport factory
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StompTransportFactory extends TcpTransportFactory {
+
+    protected String getDefaultWireFormatType() {
+        return "stomp";
+    }
+    
+    public Transport compositeConfigure(Transport transport, WireFormat 
format, Map options) {
+       transport = new StompTransportFilter(transport);
+       return super.compositeConfigure(transport, format, options);
+    }
+}
\ No newline at end of file

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,95 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * The StompTransportFilter normally sits on top of a TcpTransport
+ * that has been configured with the StompWireFormat and is used to
+ * convert STOMP commands to ActiveMQ commands.
+ * 
+ * All of the coversion work is done by delegating to the ProtocolConverter. 
+ *  
+ * @author <a href="http://hiramchirino.com";>chirino</a> 
+ */
+public class StompTransportFilter extends TransportFilter {
+
+    ProtocolConverter protocolConverter = new ProtocolConverter();
+    
+    private final Object sendToActiveMQMutex = new Object();
+    private final Object sendToStompMutex = new Object();
+    
+       public StompTransportFilter(Transport next) {
+               super(next);
+               protocolConverter.setTransportFilter(this);
+       }
+
+       public void start() throws Exception {
+               super.start();
+       }
+       
+       public void stop() throws Exception {
+               super.stop();
+       }
+       
+       public void oneway(Command command) throws IOException {
+        try {
+               protocolConverter.onActiveMQCommad(command);
+               } catch (JMSException e) {
+                       throw IOExceptionSupport.create(e);
+               }
+       }
+       
+       public void onCommand(Command command) {
+        try {
+               protocolConverter.onStompCommad((StompCommand) command);
+               } catch (IOException e) {
+                       onException(e);
+               } catch (JMSException e) {
+                       onException(IOExceptionSupport.create(e));
+               }
+       }
+       
+       public void onException(IOException error) {
+               protocolConverter.onStompExcepton(error);
+               transportListener.onException(error);
+       }
+
+
+       public void sendToActiveMQ(Command command) {
+               synchronized(sendToActiveMQMutex) {
+                       transportListener.onCommand(command);
+               }
+       }
+       
+       public void sendToStomp(StompCommand command) throws IOException {
+               synchronized(sendToStompMutex) {
+                       next.oneway(command);
+               }
+       }
+
+
+       
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,200 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activeio.adapter.PacketInputStream;
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.transport.stomp.Stomp;
+
+/**
+ * Implements marshalling and unmarsalling the <a 
href="http://stomp.codehaus.org/";>Stomp</a> protocol.
+ */
+public class StompWireFormat implements WireFormat {
+
+    private static final byte[] NO_DATA = new byte[]{};
+       private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
+       
+       private static final int MAX_COMMAND_LENGTH = 1024;
+       private static final int MAX_HEADER_LENGTH = 1024*10;
+       private static final int MAX_HEADERS = 1000;
+       private static final int MAX_DATA_LENGTH = 1024*1024*100;
+    
+       private int version=1;
+
+       public Packet marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return new ByteArrayPacket(baos.toByteSequence());
+    }
+
+    public Object unmarshal(Packet packet) throws IOException {
+        PacketInputStream stream = new PacketInputStream(packet);
+        DataInputStream dis = new DataInputStream(stream);
+        return unmarshal(dis);
+    }
+
+    public void marshal(Object command, DataOutputStream os) throws 
IOException {
+               StompCommand stomp = 
(org.apache.activemq.transport.stomp2.StompCommand) command;
+
+               StringBuffer buffer = new StringBuffer();
+               buffer.append(stomp.getAction());
+               buffer.append(Stomp.NEWLINE);
+
+               // Output the headers.
+               for (Iterator iter = stomp.getHeaders().entrySet().iterator(); 
iter.hasNext();) {
+                       Map.Entry entry = (Map.Entry) iter.next();
+                       buffer.append(entry.getKey());
+                       buffer.append(Stomp.Headers.SEPERATOR);
+                       buffer.append(entry.getValue());
+                       buffer.append(Stomp.NEWLINE);
+               }
+
+               // Add a newline to seperate the headers from the content.
+               buffer.append(Stomp.NEWLINE);
+
+               os.write(buffer.toString().getBytes("UTF-8"));
+               os.write(stomp.getContent());
+               os.write(END_OF_FRAME);
+       }
+    
+
+    public Object unmarshal(DataInputStream in) throws IOException {
+               
+        String action = null;
+        
+        // skip white space to next real action line
+               while (true) {
+                       action = readLine(in, MAX_COMMAND_LENGTH, "The maximum 
command length was exceeded");
+                       if (action == null) {
+                               throw new IOException("connection was closed");
+                       } else {
+                               action = action.trim();
+                               if (action.length() > 0) {
+                                       break;
+                               }
+                       }
+               }
+               
+               // Parse the headers
+       HashMap headers = new HashMap(25);
+        while (true) {
+            String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header 
length was exceeded");
+            if (line != null && line.trim().length() > 0) {
+               
+               if( headers.size() > MAX_HEADERS )
+                       throw new ProtocolException("The maximum number of 
headers was exceeded");
+               
+                try {
+                    int seperator_index = 
line.indexOf(Stomp.Headers.SEPERATOR);
+                    String name = line.substring(0, seperator_index).trim();
+                    String value = line.substring(seperator_index + 1, 
line.length()).trim();
+                    headers.put(name, value);
+                }
+                catch (Exception e) {
+                    throw new ProtocolException("Unable to parser header line 
[" + line + "]");
+                }
+            }
+            else {
+                break;
+            }
+        }
+        
+        // Read in the data part.
+        byte[] data = NO_DATA;
+        String contentLength = 
(String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+        if (contentLength!=null) {
+            
+               // Bless the client, he's telling us how much data to read in.  
        
+               int length;
+                       try {
+                               length = Integer.parseInt(contentLength.trim());
+                       } catch (NumberFormatException e) {
+                               throw new ProtocolException("Specified 
content-length is not a valid integer");
+                       }
+
+                       if( length > MAX_DATA_LENGTH )
+                       throw new ProtocolException("The maximum data length 
was exceeded");
+                       
+            data = new byte[length];
+            in.readFully(data);
+            
+            if (in.readByte() != 0) {
+                throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" 
bytes were read and " + "there was no trailing null byte");
+            }
+        
+        } else {
+
+               // We don't know how much to read.. data ends when we hit a 0
+            byte b;
+            ByteArrayOutputStream baos=null;
+            while ((b = in.readByte()) != 0) {
+                       
+                       if( baos == null ) {
+                       baos = new ByteArrayOutputStream();
+               } else if( baos.size() > MAX_DATA_LENGTH ) {
+                       throw new ProtocolException("The maximum data length 
was exceeded");
+               }
+            
+                baos.write(b);
+            }
+            
+            if( baos!=null ) {
+                   baos.close();
+                   data = baos.toByteArray();
+            }
+            
+        }
+        
+        return new StompCommand(action, headers, data); 
+
+    }
+
+    private String readLine(DataInputStream in, int maxLength, String 
errorMessage) throws IOException {
+        byte b;
+        ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
+        while ((b = in.readByte()) != '\n') {
+               if( baos.size() > maxLength )
+                       throw new ProtocolException(errorMessage);
+            baos.write(b);
+        }
+        ByteSequence sequence = baos.toByteSequence();
+               return new 
String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
+       }
+
+       public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java?rev=418550&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
 Sat Jul  1 21:18:44 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.command.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a 
href="http://stomp.codehaus.org/";>Stomp</a> protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+    public WireFormat createWireFormat() {
+        return new StompWireFormat();
+    }
+}

Modified: 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp?rev=418550&r1=418549&r2=418550&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
 Sat Jul  1 21:18:44 2006
@@ -1 +1 @@
-class=org.apache.activemq.transport.stomp.StompTransportFactory
+class=org.apache.activemq.transport.stomp2.StompTransportFactory

Modified: 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp?rev=418550&r1=418549&r2=418550&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
 Sat Jul  1 21:18:44 2006
@@ -1 +1 @@
-class=org.apache.activemq.transport.stomp.StompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.transport.stomp2.StompWireFormatFactory
\ No newline at end of file


Reply via email to