Author: dejanb
Date: Mon Aug 17 11:43:34 2009
New Revision: 804943

URL: http://svn.apache.org/viewvc?rev=804943&view=rev
Log:
additional fix for https://issues.apache.org/activemq/browse/AMQ-1807 - fixing 
test case and making it work

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Mon Aug 17 11:43:34 2009
@@ -236,7 +236,6 @@
                                                 dequeueCounter++;
                                                 dispatched.remove(node);
                                                 
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
-                                                prefetchExtension--;
                                             }
                                         }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
 Mon Aug 17 11:43:34 2009
@@ -361,11 +361,7 @@
         }
         for (Iterator<StompSubscription> iter = 
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
             StompSubscription sub = iter.next();
-            try {
-               sub.onStompAbort(activemqTx);
-            } catch (Exception e) {
-               throw new ProtocolException("Transaction abort failed", false, 
e);
-            }
+            sub.onStompAbort(activemqTx);
         }
 
         TransactionInfo tx = new TransactionInfo();

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
 Mon Aug 17 11:43:34 2009
@@ -188,15 +188,6 @@
     }
     
     public void abort(String transaction) throws Exception {
-       // discard all content on the wire before
-       // aborting the transaction
-       try {
-               StompFrame discarded = this.receive(100);
-               while (discarded != null) {
-                       discarded = this.receive(100);
-               }
-       } catch (Exception e) {                 
-       }
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("transaction", transaction);
        StompFrame frame = new StompFrame("ABORT", headers);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 Mon Aug 17 11:43:34 2009
@@ -99,43 +99,20 @@
         protocolConverter.getTransportFilter().sendToStomp(command);
     }
     
-    synchronized void onStompAbort(TransactionId transactionId) throws 
IOException, JMSException {
-       //ack all unacked messages
-       for (MessageDispatch md : dispatchedMessage.values()) {
-               if (!unconsumedMessage.contains(md)) {
-               MessageAck ack = new MessageAck();
-               ack.setDestination(consumerInfo.getDestination());
-               ack.setConsumerId(consumerInfo.getConsumerId());
-               ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
-               ack.setFirstMessageId(md.getMessage().getMessageId());
-               ack.setLastMessageId(md.getMessage().getMessageId());
-               ack.setMessageCount(1);
-               ack.setTransactionId(transactionId);
-               protocolConverter.getTransportFilter().sendToActiveMQ(ack);
-               unconsumedMessage.add(md);
-               }
-       }
-       // redeliver all unconsumed messages
-       for (MessageDispatch md : unconsumedMessage) {
-               onMessageDispatch(md);
-       }
+    synchronized void onStompAbort(TransactionId transactionId) {
+       unconsumedMessage.clear();
     }
     
     synchronized void onStompCommit(TransactionId transactionId) {
-       // ack all messages
-       if (!unconsumedMessage.isEmpty()) {
-               MessageAck ack = new MessageAck();
-               ack.setDestination(consumerInfo.getDestination());
-               ack.setConsumerId(consumerInfo.getConsumerId());
-               ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-               
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
-               
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
-               ack.setMessageCount(unconsumedMessage.size());
-               ack.setTransactionId(transactionId);
-               protocolConverter.getTransportFilter().sendToActiveMQ(ack);
-               // clear lists
-               unconsumedMessage.clear();
+       for (Iterator iter = dispatchedMessage.entrySet().iterator(); 
iter.hasNext();) {
+            Map.Entry entry = (Entry)iter.next();
+            MessageId id = (MessageId)entry.getKey();
+            MessageDispatch msg = (MessageDispatch)entry.getValue();
+            if (unconsumedMessage.contains(msg)) {
+               iter.remove();
+            }
        }
+       unconsumedMessage.clear();
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId 
transactionId) {
@@ -151,11 +128,7 @@
         ack.setConsumerId(consumerInfo.getConsumerId());
 
         if (ackMode == CLIENT_ACK) {
-               if (transactionId != null) {
-                       ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
-               } else {
-                       ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-               }
+               ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
             int count = 0;
             for (Iterator iter = dispatchedMessage.entrySet().iterator(); 
iter.hasNext();) {
 
@@ -168,10 +141,12 @@
                 }
                 
                 if (transactionId != null) {
-                       if (!unconsumedMessage.contains(msg))
+                       if (!unconsumedMessage.contains(msg)) {
                                unconsumedMessage.add(msg);
+                       }
+                } else {
+                       iter.remove();
                 }
-                iter.remove();
                 
                 
                 count++;

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 Mon Aug 17 11:43:34 2009
@@ -977,24 +977,24 @@
         
         stompConnection.begin("tx2");
         
+        // Previously delivered message need to get re-acked...
+        stompConnection.ack(frame, "tx2");
+        stompConnection.ack(frame1, "tx2");
+        
         StompFrame frame3 = stompConnection.receive();
-        assertEquals(frame3.getBody(), "message 1");
+        assertEquals(frame3.getBody(), "message 3");
         stompConnection.ack(frame3, "tx2");
         
         StompFrame frame4 = stompConnection.receive();
-        assertEquals(frame4.getBody(), "message 2");
+        assertEquals(frame4.getBody(), "message 4");
         stompConnection.ack(frame4, "tx2");
         
-        StompFrame frame5 = stompConnection.receive();
-        assertEquals(frame5.getBody(), "message 3");        
-        stompConnection.ack(frame5, "tx2");
-        
         stompConnection.commit("tx2");
         
         stompConnection.begin("tx3");
-        StompFrame frame6 = stompConnection.receive();
-        assertEquals(frame6.getBody(), "message 4");
-        stompConnection.ack(frame6, "tx3");
+        StompFrame frame5 = stompConnection.receive();
+        assertEquals(frame5.getBody(), "message 5");
+        stompConnection.ack(frame5, "tx3");
         stompConnection.commit("tx3");
         
         stompDisconnect();


Reply via email to