Author: dejanb
Date: Fri Aug  7 09:18:23 2009
New Revision: 801916

URL: http://svn.apache.org/viewvc?rev=801916&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2280 - stomp transactions 
and multiple destinations

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=801916&r1=801915&r2=801916&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 Fri Aug  7 09:18:23 2009
@@ -123,18 +123,19 @@
     
     synchronized void onStompCommit(TransactionId transactionId) {
        // ack all messages
-        MessageAck ack = new MessageAck();
-        ack.setDestination(consumerInfo.getDestination());
-        ack.setConsumerId(consumerInfo.getConsumerId());
-        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-        
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
-        
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
-        ack.setMessageCount(unconsumedMessage.size());
-        ack.setTransactionId(transactionId);
-        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
-        // clear lists
-       unconsumedMessage.clear();
-       dispatchedMessage.clear();
+       if (!unconsumedMessage.isEmpty()) {
+               MessageAck ack = new MessageAck();
+               ack.setDestination(consumerInfo.getDestination());
+               ack.setConsumerId(consumerInfo.getConsumerId());
+               ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+               
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
+               
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
+               ack.setMessageCount(unconsumedMessage.size());
+               ack.setTransactionId(transactionId);
+               protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+               // clear lists
+               unconsumedMessage.clear();
+       }
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId 
transactionId) {
@@ -169,9 +170,8 @@
                 if (transactionId != null) {
                        if (!unconsumedMessage.contains(msg))
                                unconsumedMessage.add(msg);
-                } else {
-                       iter.remove();
                 }
+                iter.remove();
                 
                 
                 count++;

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=801916&r1=801915&r2=801916&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 Fri Aug  7 09:18:23 2009
@@ -998,7 +998,38 @@
         
         stompDisconnect();
        
-    }       
+    }
+    
+    public void testTransactionsWithMultipleDestinations() throws Exception {
+
+       stompConnection.connect("system", "manager");
+       
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("activemq.prefetchSize", "1");
+        headers.put("activemq.exclusive", "true");
+       
+       stompConnection.subscribe("/queue/test1", "client", headers);
+       
+       stompConnection.begin("ID:tx1");
+       
+       headers.clear();
+       headers.put("receipt", "ID:msg1");
+       stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
+       
+       stompConnection.commit("ID:tx1");
+       
+       // make sure connection is active after commit
+       Thread.sleep(1000);
+       stompConnection.send("/queue/test1", "another message");
+       
+       StompFrame frame = stompConnection.receive(500);
+       System.out.println(frame);
+       assertNotNull(frame);
+       
+       
+       stompConnection.disconnect();
+    }
+    
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = 
broker.getBroker().getClients();
         int actual = clients.length;


Reply via email to