Author: dejanb
Date: Tue Nov 18 05:56:13 2008
New Revision: 718590

URL: http://svn.apache.org/viewvc?rev=718590&view=rev
Log:
fix for AMQ-2003

Added:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
      - copied, changed from r706270, 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/assembly/src/release/example/src/StompExample.java   (with 
props)
Removed:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
Modified:
    activemq/trunk/assembly/src/release/example/build.xml

Copied: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
 (from r706270, 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java)
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java&r1=706270&r2=718590&rev=718590&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
 Tue Nov 18 05:56:13 2008
@@ -18,11 +18,17 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
 
 public class StompConnection {
 
@@ -53,12 +59,24 @@
         outputStream.write(0);
         outputStream.flush();
     }
+    
+    public StompFrame receive() throws Exception {
+        return receive(RECEIVE_TIMEOUT);
+    }    
+    
+    public StompFrame receive(long timeOut) throws Exception {
+       stompSocket.setSoTimeout((int)timeOut);
+       InputStream is = stompSocket.getInputStream();
+        StompWireFormat wf = new StompWireFormat();
+        DataInputStream dis = new DataInputStream(is);
+        return (StompFrame)wf.unmarshal(dis);
+    }
 
     public String receiveFrame() throws Exception {
         return receiveFrame(RECEIVE_TIMEOUT);
     }
 
-    private String receiveFrame(long timeOut) throws Exception {
+    public String receiveFrame(long timeOut) throws Exception {
         stompSocket.setSoTimeout((int)timeOut);
         InputStream is = stompSocket.getInputStream();
         int c = 0;
@@ -87,5 +105,115 @@
        public void setStompSocket(Socket stompSocket) {
                this.stompSocket = stompSocket;
        }
+       
+    public void connect(String username, String password) throws Exception {
+       HashMap<String, String> headers = new HashMap();
+       headers.put("login", username);
+       headers.put("password", password);
+       StompFrame frame = new StompFrame("CONNECT", headers);
+        sendFrame(frame.toString());
+    }
+    
+    public void disconnect() throws Exception {
+       StompFrame frame = new StompFrame("DISCONNECT");
+        sendFrame(frame.toString());           
+    }
+    
+    public void send(String destination, String message) throws Exception {
+       send(destination, message, null);
+    }
+       
+    public void send(String destination, String message, HashMap<String, 
String> headers) throws Exception {
+       if (headers == null) {
+               headers = new HashMap<String, String>();
+       }
+       headers.put("destination", destination);
+       StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
+        sendFrame(frame.toString());           
+    }
+    
+    public void subscribe(String destination) throws Exception {
+       subscribe(destination, null, null);
+    }
+    
+    public void subscribe(String destination, String ack) throws Exception {
+       subscribe(destination, ack, new HashMap<String, String>());
+    }
+    
+    public void subscribe(String destination, String ack, HashMap<String, 
String> headers) throws Exception {
+               if (headers == null) {
+                       headers = new HashMap<String, String>();
+               }
+               headers.put("destination", destination);
+       if (ack != null) {
+               headers.put("ack", ack);
+       }
+       StompFrame frame = new StompFrame("SUBSCRIBE", headers);
+        sendFrame(frame.toString());           
+    }
+    
+    public void unsubscribe(String destination) throws Exception {
+       unsubscribe(destination, null);
+    }
+    
+    public void unsubscribe(String destination, HashMap<String, String> 
headers) throws Exception {
+               if (headers == null) {
+                       headers = new HashMap<String, String>();
+               }
+               headers.put("destination", destination);
+       StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
+        sendFrame(frame.toString());           
+    }    
+    
+    public void begin(String transaction) throws Exception {
+       HashMap<String, String> headers = new HashMap<String, String>();
+       headers.put("transaction", transaction);
+       StompFrame frame = new StompFrame("BEGIN", headers);
+       sendFrame(frame.toString());
+    }
+    
+    public void abort(String transaction) throws Exception {
+       HashMap<String, String> headers = new HashMap<String, String>();
+       headers.put("transaction", transaction);
+       StompFrame frame = new StompFrame("ABORT", headers);
+       sendFrame(frame.toString());
+    }
+    
+    public void commit(String transaction) throws Exception {
+       HashMap<String, String> headers = new HashMap<String, String>();
+       headers.put("transaction", transaction);
+       StompFrame frame = new StompFrame("COMMIT", headers);
+       sendFrame(frame.toString());
+    }
+    
+    public void ack(StompFrame frame) throws Exception {
+       ack(frame.getHeaders().get("message-id"), null);
+    }    
+    
+    public void ack(StompFrame frame, String transaction) throws Exception {
+       ack(frame.getHeaders().get("message-id"), transaction);
+    }
+    
+    public void ack(String messageId) throws Exception {
+       ack(messageId, null);
+    }
+    
+    public void ack(String messageId, String transaction) throws Exception {
+       HashMap<String, String> headers = new HashMap<String, String>();
+       headers.put("message-id", messageId);
+       if (transaction != null)
+               headers.put("transaction", transaction);
+       StompFrame frame = new StompFrame("ACK", headers);
+       sendFrame(frame.toString());    
+    }
+    
+    protected String appendHeaders(HashMap<String, Object> headers) {
+       StringBuffer result = new StringBuffer();
+       for (String key : headers.keySet()) {
+               result.append(key + ":" + headers.get(key) + "\n");
+       }
+       result.append("\n");
+       return result.toString();
+    }
 
 }

Modified: activemq/trunk/assembly/src/release/example/build.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/build.xml?rev=718590&r1=718589&r2=718590&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/build.xml (original)
+++ activemq/trunk/assembly/src/release/example/build.xml Tue Nov 18 05:56:13 
2008
@@ -290,4 +290,12 @@
                </java>
        </target>       
        
+       <target name="stomp" depends="compile" description="Runs a Stomp 
example">
+               <echo>Running a Stomp example</echo>
+               <java classname="StompExample" fork="yes" maxmemory="100M">
+                       <classpath refid="javac.classpath" />
+                       <jvmarg value="-server" />
+               </java>
+       </target>
+       
 </project>

Added: activemq/trunk/assembly/src/release/example/src/StompExample.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/StompExample.java?rev=718590&view=auto
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/StompExample.java (added)
+++ activemq/trunk/assembly/src/release/example/src/StompExample.java Tue Nov 
18 05:56:13 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
+
+/**
+ * 
+ * This example demonstrates Stomp Java API
+ * 
+ * @version $Revision$
+ *
+ */
+public class StompExample {
+
+       public static void main(String args[]) throws Exception {
+               StompConnection connection = new StompConnection();
+               connection.open("localhost", 61613);
+               
+               connection.connect("system", "manager");
+               StompFrame connect = connection.receive();
+               if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+                       throw new Exception ("Not connected");
+               }
+               
+               connection.begin("tx1");
+               connection.send("/queue/test", "message1");
+               connection.send("/queue/test", "message2");
+               connection.commit("tx1");
+               
+               connection.subscribe("/queue/test", 
Subscribe.AckModeValues.CLIENT);
+               
+               connection.begin("tx2");
+               
+               StompFrame message = connection.receive();
+               System.out.println(message.getBody());
+               connection.ack(message, "tx2");
+               
+               message = connection.receive();
+               System.out.println(message.getBody());
+               connection.ack(message, "tx2");
+               
+               connection.commit("tx2");
+               
+               connection.disconnect();
+       }
+       
+}

Propchange: activemq/trunk/assembly/src/release/example/src/StompExample.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to