Author: dejanb
Date: Thu Sep 1 08:28:47 2011
New Revision: 1163940
URL: http://svn.apache.org/viewvc?rev=1163940&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3481 - stomp deadlock
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1163940&r1=1163939&r2=1163940&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Sep 1 08:28:47 2011
@@ -538,7 +538,9 @@ public class ProtocolConverter {
if (subscriptionId != null) {
subscriptions.put(subscriptionId, stompSubscription);
}
- sendToActiveMQ(consumerInfo, createResponseHandler(command));
+
+ sendToActiveMQ(consumerInfo, null);
+ sendReceipt(command);
}
protected void onStompUnsubscribe(StompFrame command) throws
ProtocolException {
@@ -840,4 +842,19 @@ public class ProtocolConverter {
}
}
}
+
+ protected void sendReceipt(StompFrame command) {
+ final String receiptId =
command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null) {
+ StompFrame sc = new StompFrame();
+ sc.setAction(Stomp.Responses.RECEIPT);
+ sc.setHeaders(new HashMap<String, String>(1));
+ sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ try {
+ sendToStomp(sc);
+ } catch (IOException e) {
+ LOG.warn("Could not send a receipt for " + command, e);
+ }
+ }
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1163940&r1=1163939&r2=1163940&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Thu Sep 1 08:28:47 2011
@@ -1559,6 +1559,44 @@ public class StompTest extends Combinati
assertNotNull(stompMessage);
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
}
+
+ public void testReceiptNewQueue() throws Exception {
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234
+ "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" +
"receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame receipt = stompConnection.receive();
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("8fee4b8-4e5c9f66-4703-e936-2",
receipt.getHeaders().get("receipt-id"));
+
+
+ frame = "SEND\n destination:/queue/" + getQueueName() + 123 +
"\ncontent-length:0" + " \n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 +
"\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" +
"receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("8fee4b8-4e5c9f66-4703-e936-1",
receipt.getHeaders().get("receipt-id"));
+
+ StompFrame message = stompConnection.receive();
+ assertTrue(message.getAction().startsWith("MESSAGE"));
+
+ String length = message.getHeaders().get("content-length");
+ assertEquals("0", length);
+ assertEquals(0, message.getContent().length);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();