Author: gtully
Date: Wed Sep 14 10:46:48 2011
New Revision: 1170523

URL: http://svn.apache.org/viewvc?rev=1170523&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3497 - send subscription receipt 
early such that it cannot be preceeded by a message dispatch

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1170523&r1=1170522&r2=1170523&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
 Wed Sep 14 10:46:48 2011
@@ -539,8 +539,9 @@ public class ProtocolConverter {
             subscriptions.put(subscriptionId, stompSubscription);
         }
 
-        sendToActiveMQ(consumerInfo, null);
+        // dispatch can beat the receipt so send it early
         sendReceipt(command);
+        sendToActiveMQ(consumerInfo, null);
     }
 
     protected void onStompUnsubscribe(StompFrame command) throws 
ProtocolException {

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1170523&r1=1170522&r2=1170523&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 Wed Sep 14 10:46:48 2011
@@ -113,6 +113,7 @@ public class StompTest extends Combinati
                 + "}}";
         }
         broker = BrokerFactory.createBroker(new URI(confUri));
+        broker.setDeleteAllMessagesOnStartup(true);
         broker.start();
         broker.waitUntilStarted();
 
@@ -375,6 +376,51 @@ public class StompTest extends Combinati
         stompConnection.sendFrame(frame);
     }
 
+
+    public void testSubscriptionReceipts() throws Exception {
+        final int done = 500;
+        int count = 0;
+
+        URI connectUri = new URI(bindAddress);
+
+        do {
+
+            StompConnection sender = new StompConnection();
+            sender.open(createSocket(connectUri));
+            String frame = "CONNECT\n" + "login: system\n" + "passcode: 
manager\n\n" + Stomp.NULL;
+            sender.sendFrame(frame);
+
+            frame = sender.receiveFrame();
+            assertTrue(frame.startsWith("CONNECTED"));
+
+            frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" 
+ "Hello World:" + count + Stomp.NULL;
+            sender.sendFrame(frame);
+
+            sender.disconnect();
+
+            StompConnection receiver = new StompConnection();
+            receiver.open(createSocket(connectUri));
+
+            frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" 
+ Stomp.NULL;
+            receiver.sendFrame(frame);
+
+            frame = receiver.receiveFrame();
+            assertTrue(frame.startsWith("CONNECTED"));
+
+            frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 
"\n"  + "receipt: " + (count++) + "\n\n" + Stomp.NULL;
+            receiver.sendFrame(frame);
+
+            frame = receiver.receiveFrame();
+            assertTrue("" + frame, frame.startsWith("RECEIPT"));
+            assertTrue("Receipt contains receipt-id", 
frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
+            LOG.info("received: " + 
frame.substring(frame.indexOf(Stomp.Headers.Response.RECEIPT_ID)));
+            frame = receiver.receiveFrame();
+            assertTrue("" + frame, frame.startsWith("MESSAGE"));
+            receiver.disconnect();
+        } while (count < done);
+
+    }
+
     public void testSubscribeWithAutoAck() throws Exception {
 
         String frame = "CONNECT\n" + "login: system\n" + "passcode: 
manager\n\n" + Stomp.NULL;


Reply via email to