Author: dejanb
Date: Fri Aug 7 09:18:23 2009
New Revision: 801916
URL: http://svn.apache.org/viewvc?rev=801916&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2280 - stomp transactions
and multiple destinations
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=801916&r1=801915&r2=801916&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Fri Aug 7 09:18:23 2009
@@ -123,18 +123,19 @@
synchronized void onStompCommit(TransactionId transactionId) {
// ack all messages
- MessageAck ack = new MessageAck();
- ack.setDestination(consumerInfo.getDestination());
- ack.setConsumerId(consumerInfo.getConsumerId());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
-
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
- ack.setMessageCount(unconsumedMessage.size());
- ack.setTransactionId(transactionId);
- protocolConverter.getTransportFilter().sendToActiveMQ(ack);
- // clear lists
- unconsumedMessage.clear();
- dispatchedMessage.clear();
+ if (!unconsumedMessage.isEmpty()) {
+ MessageAck ack = new MessageAck();
+ ack.setDestination(consumerInfo.getDestination());
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
+
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
+ ack.setMessageCount(unconsumedMessage.size());
+ ack.setTransactionId(transactionId);
+ protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+ // clear lists
+ unconsumedMessage.clear();
+ }
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId
transactionId) {
@@ -169,9 +170,8 @@
if (transactionId != null) {
if (!unconsumedMessage.contains(msg))
unconsumedMessage.add(msg);
- } else {
- iter.remove();
}
+ iter.remove();
count++;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=801916&r1=801915&r2=801916&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Aug 7 09:18:23 2009
@@ -998,7 +998,38 @@
stompDisconnect();
- }
+ }
+
+ public void testTransactionsWithMultipleDestinations() throws Exception {
+
+ stompConnection.connect("system", "manager");
+
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("activemq.prefetchSize", "1");
+ headers.put("activemq.exclusive", "true");
+
+ stompConnection.subscribe("/queue/test1", "client", headers);
+
+ stompConnection.begin("ID:tx1");
+
+ headers.clear();
+ headers.put("receipt", "ID:msg1");
+ stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
+
+ stompConnection.commit("ID:tx1");
+
+ // make sure connection is active after commit
+ Thread.sleep(1000);
+ stompConnection.send("/queue/test1", "another message");
+
+ StompFrame frame = stompConnection.receive(500);
+ System.out.println(frame);
+ assertNotNull(frame);
+
+
+ stompConnection.disconnect();
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();
int actual = clients.length;