Author: gtully
Date: Wed Sep 14 10:46:48 2011
New Revision: 1170523
URL: http://svn.apache.org/viewvc?rev=1170523&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3497 - send subscription receipt
early such that it cannot be preceeded by a message dispatch
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1170523&r1=1170522&r2=1170523&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Wed Sep 14 10:46:48 2011
@@ -539,8 +539,9 @@ public class ProtocolConverter {
subscriptions.put(subscriptionId, stompSubscription);
}
- sendToActiveMQ(consumerInfo, null);
+ // dispatch can beat the receipt so send it early
sendReceipt(command);
+ sendToActiveMQ(consumerInfo, null);
}
protected void onStompUnsubscribe(StompFrame command) throws
ProtocolException {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1170523&r1=1170522&r2=1170523&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Wed Sep 14 10:46:48 2011
@@ -113,6 +113,7 @@ public class StompTest extends Combinati
+ "}}";
}
broker = BrokerFactory.createBroker(new URI(confUri));
+ broker.setDeleteAllMessagesOnStartup(true);
broker.start();
broker.waitUntilStarted();
@@ -375,6 +376,51 @@ public class StompTest extends Combinati
stompConnection.sendFrame(frame);
}
+
+ public void testSubscriptionReceipts() throws Exception {
+ final int done = 500;
+ int count = 0;
+
+ URI connectUri = new URI(bindAddress);
+
+ do {
+
+ StompConnection sender = new StompConnection();
+ sender.open(createSocket(connectUri));
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
+ sender.sendFrame(frame);
+
+ frame = sender.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n"
+ "Hello World:" + count + Stomp.NULL;
+ sender.sendFrame(frame);
+
+ sender.disconnect();
+
+ StompConnection receiver = new StompConnection();
+ receiver.open(createSocket(connectUri));
+
+ frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n"
+ Stomp.NULL;
+ receiver.sendFrame(frame);
+
+ frame = receiver.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() +
"\n" + "receipt: " + (count++) + "\n\n" + Stomp.NULL;
+ receiver.sendFrame(frame);
+
+ frame = receiver.receiveFrame();
+ assertTrue("" + frame, frame.startsWith("RECEIPT"));
+ assertTrue("Receipt contains receipt-id",
frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
+ LOG.info("received: " +
frame.substring(frame.indexOf(Stomp.Headers.Response.RECEIPT_ID)));
+ frame = receiver.receiveFrame();
+ assertTrue("" + frame, frame.startsWith("MESSAGE"));
+ receiver.disconnect();
+ } while (count < done);
+
+ }
+
public void testSubscribeWithAutoAck() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;