Author: chirino
Date: Mon Apr 17 08:36:04 2006
New Revision: 394710

URL: http://svn.apache.org/viewcvs?rev=394710&view=rev
Log:
Missing synchronization would cause acks to not be delivered to the broker.  
After enough acks were missed, 
the consumer would stop receiving messages due to the broker thinking the 
consumers prefetch is full.

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java?rev=394710&r1=394709&r2=394710&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
 Mon Apr 17 08:36:04 2006
@@ -93,7 +93,7 @@
         out.write(builder.toFrame());
     }
 
-    private void addMessageDispatch(MessageDispatch md) {
+    synchronized private void addMessageDispatch(MessageDispatch md) {
         dispatchedMessages.addLast(md);
     }
 
@@ -117,7 +117,7 @@
         return subscriptionId;
     }
 
-    public MessageAck createMessageAck(String message_id) {
+    synchronized public MessageAck createMessageAck(String message_id) {
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@@ -136,6 +136,7 @@
             count++;
             if( id.equals(message_id)  ) {
                 ack.setLastMessageId(md.getMessage().getMessageId());
+                break;
             }
         }
         ack.setMessageCount(count);

Modified: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=394710&r1=394709&r2=394710&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 Mon Apr 17 08:36:04 2006
@@ -96,6 +96,8 @@
             if( c < 0 ) {
                 throw new IOException("socket closed.");
             } else if( c == 0 ) {
+                c = is.read();
+                assertEquals("Expecting stomp frame to terminate with \0\n", 
c, '\n');
                 byte[] ba = inputBuffer.toByteArray();
                 inputBuffer.reset();
                 return new String(ba, "UTF-8");


Reply via email to