Author: dejanb
Date: Thu Jan 29 16:01:35 2009
New Revision: 738904

URL: http://svn.apache.org/viewvc?rev=738904&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1807

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=738904&r1=738903&r2=738904&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
 Thu Jan 29 16:01:35 2009
@@ -275,7 +275,7 @@
         boolean acked = false;
         for (Iterator<StompSubscription> iter = 
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
             StompSubscription sub = iter.next();
-            MessageAck ack = sub.onStompMessageAck(messageId);
+            MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
             if (ack != null) {
                 ack.setTransactionId(activemqTx);
                 sendToActiveMQ(ack, createResponseHandler(command));
@@ -331,6 +331,11 @@
         if (activemqTx == null) {
             throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
+        
+        for (Iterator<StompSubscription> iter = 
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+            StompSubscription sub = iter.next();
+            sub.onStompCommit(activemqTx);
+        }
 
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
@@ -338,6 +343,7 @@
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
 
         sendToActiveMQ(tx, createResponseHandler(command));
+        
     }
 
     protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -353,6 +359,14 @@
         if (activemqTx == null) {
             throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
+        for (Iterator<StompSubscription> iter = 
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+            StompSubscription sub = iter.next();
+            try {
+               sub.onStompAbort(activemqTx);
+            } catch (Exception e) {
+               throw new ProtocolException("Transaction abort failed", false, 
e);
+            }
+        }
 
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
@@ -543,7 +557,6 @@
      * @throws IOException
      */
     public void onActiveMQCommad(Command command) throws IOException, 
JMSException {
-
         if (command.isResponse()) {
 
             Response response = (Response)command;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=738904&r1=738903&r2=738904&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 Thu Jan 29 16:01:35 2009
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -30,9 +31,10 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
 
 /**
- * Keeps track of the STOMP susbscription so that acking is correctly done.
+ * Keeps track of the STOMP subscription so that acking is correctly done.
  * 
  * @author <a href="http://hiramchirino.com";>chirino</a>
  */
@@ -46,11 +48,13 @@
     private final String subscriptionId;
     private final ConsumerInfo consumerInfo;
 
-    private final LinkedHashMap<String, MessageId> dispatchedMessage = new 
LinkedHashMap<String, MessageId>();
+    private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage 
= new LinkedHashMap<MessageId, MessageDispatch>();
+    private final LinkedList<MessageDispatch> unconsumedMessage = new 
LinkedList<MessageDispatch>();
 
     private String ackMode = AUTO_ACK;
     private ActiveMQDestination destination;
     private String transformation;
+    
 
     public StompSubscription(ProtocolConverter stompTransport, String 
subscriptionId, ConsumerInfo consumerInfo, String transformation) {
         this.protocolConverter = stompTransport;
@@ -60,16 +64,14 @@
     }
 
     void onMessageDispatch(MessageDispatch md) throws IOException, 
JMSException {
-
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-
         if (ackMode == CLIENT_ACK) {
             synchronized (this) {
-                dispatchedMessage.put(message.getJMSMessageID(), 
message.getMessageId());
+                dispatchedMessage.put(message.getMessageId(), md);
             }
         } else if (ackMode == INDIVIDUAL_ACK) {
             synchronized (this) {
-                dispatchedMessage.put(message.getJMSMessageID(), 
message.getMessageId());
+                dispatchedMessage.put(message.getMessageId(), md);
             }
         } else if (ackMode == AUTO_ACK) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 
1);
@@ -86,19 +88,60 @@
                        ignoreTransformation = true;
                }
         }
+        
         StompFrame command = protocolConverter.convertMessage(message, 
ignoreTransformation);
 
         command.setAction(Stomp.Responses.MESSAGE);
         if (subscriptionId != null) {
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, 
subscriptionId);
         }
-
+        
         protocolConverter.getTransportFilter().sendToStomp(command);
     }
-
-    synchronized MessageAck onStompMessageAck(String messageId) {
-
-        if (!dispatchedMessage.containsKey(messageId)) {
+    
+    synchronized void onStompAbort(TransactionId transactionId) throws 
IOException, JMSException {
+       //ack all unacked messages
+       for (MessageDispatch md : dispatchedMessage.values()) {
+               if (!unconsumedMessage.contains(md)) {
+               MessageAck ack = new MessageAck();
+               ack.setDestination(consumerInfo.getDestination());
+               ack.setConsumerId(consumerInfo.getConsumerId());
+               ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+               ack.setFirstMessageId(md.getMessage().getMessageId());
+               ack.setLastMessageId(md.getMessage().getMessageId());
+               ack.setMessageCount(1);
+               ack.setTransactionId(transactionId);
+               protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+               unconsumedMessage.add(md);
+               }
+       }
+       // redeliver all unconsumed messages
+       for (MessageDispatch md : unconsumedMessage) {
+               onMessageDispatch(md);
+       }
+    }
+    
+    synchronized void onStompCommit(TransactionId transactionId) {
+       // ack all messages
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
+        
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
+        ack.setMessageCount(unconsumedMessage.size());
+        ack.setTransactionId(transactionId);
+        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+        // clear lists
+       unconsumedMessage.clear();
+       dispatchedMessage.clear();
+    }
+
+    synchronized MessageAck onStompMessageAck(String messageId, TransactionId 
transactionId) {
+       
+       MessageId msgId = new MessageId(messageId);
+       
+        if (!dispatchedMessage.containsKey(msgId)) {
             return null;
         }
 
@@ -107,33 +150,50 @@
         ack.setConsumerId(consumerInfo.getConsumerId());
 
         if (ackMode == CLIENT_ACK) {
-            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+               if (transactionId != null) {
+                       ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+               } else {
+                       ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+               }
             int count = 0;
             for (Iterator iter = dispatchedMessage.entrySet().iterator(); 
iter.hasNext();) {
 
                 Map.Entry entry = (Entry)iter.next();
-                String id = (String)entry.getKey();
-                MessageId msgid = (MessageId)entry.getValue();
+                MessageId id = (MessageId)entry.getKey();
+                MessageDispatch msg = (MessageDispatch)entry.getValue();
 
                 if (ack.getFirstMessageId() == null) {
-                    ack.setFirstMessageId(msgid);
+                    ack.setFirstMessageId(id);
                 }
-
-                iter.remove();
+                
+                if (transactionId != null) {
+                       if (!unconsumedMessage.contains(msg))
+                               unconsumedMessage.add(msg);
+                } else {
+                       iter.remove();
+                }
+                
+                
                 count++;
 
-                if (id.equals(messageId)) {
-                    ack.setLastMessageId(msgid);
+                if (id.equals(msgId)) {
+                    ack.setLastMessageId(id);
                     break;
                 }
 
             }
             ack.setMessageCount(count);
+            if (transactionId != null) {
+               ack.setTransactionId(transactionId);
+            }
         }
         else if (ackMode == INDIVIDUAL_ACK) {
             ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
-            MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
-            ack.setMessageID(msgid);
+            ack.setMessageID(msgId);
+            if (transactionId != null) {
+               unconsumedMessage.add(dispatchedMessage.get(msgId));
+               ack.setTransactionId(transactionId);
+            } 
             dispatchedMessage.remove(messageId);
         }
         return ack;

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=738904&r1=738903&r2=738904&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 Thu Jan 29 16:01:35 2009
@@ -46,7 +46,6 @@
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -937,24 +936,55 @@
         sendMessage("message 1");
         sendMessage("message 2");
         sendMessage("message 3");
+        sendMessage("message 4");
+        sendMessage("message 5");
+        
         
-        StompFrame frame = stompConnection.receive();
 
+        StompFrame frame = stompConnection.receive();
+        assertEquals(frame.getBody(), "message 1");
+        
         stompConnection.begin("tx1");
         stompConnection.ack(frame, "tx1");
 
         StompFrame frame1 = stompConnection.receive();
-        
+        assertEquals(frame1.getBody(), "message 2");
+               
         try {
                StompFrame frame2 = stompConnection.receive(500);
                if (frame2 != null) {
                        fail("Should not have received the second message");
                }
         } catch (SocketTimeoutException soe) {}
+        
+        Thread.sleep(100);
+        stompConnection.abort("tx1");
+        
+        stompConnection.begin("tx2");
+        
+        StompFrame frame3 = stompConnection.receive();
+        assertEquals(frame3.getBody(), "message 1");
+        stompConnection.ack(frame3, "tx2");
+        
+        StompFrame frame4 = stompConnection.receive();
+        assertEquals(frame4.getBody(), "message 2");
+        stompConnection.ack(frame4, "tx2");
+        
+        StompFrame frame5 = stompConnection.receive();
+        assertEquals(frame5.getBody(), "message 3");        
+        stompConnection.ack(frame5, "tx2");
+        
+        stompConnection.commit("tx2");
+        
+        stompConnection.begin("tx3");
+        StompFrame frame6 = stompConnection.receive();
+        assertEquals(frame6.getBody(), "message 4");
+        stompConnection.ack(frame6, "tx3");
+        stompConnection.commit("tx3");
+        
         stompDisconnect();
        
-    }    
-    
+    }       
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = 
broker.getBroker().getClients();
         int actual = clients.length;
@@ -969,3 +999,5 @@
         Thread.sleep(2000);
     }
 }
+
+


Reply via email to