Author: chirino
Date: Mon Jul 30 20:41:46 2012
New Revision: 1367282

URL: http://svn.apache.org/viewvc?rev=1367282&view=rev
Log:
Tighten up queue prefetching for openwire.

Modified:
    
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: 
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1367282&r1=1367281&r2=1367282&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
 Mon Jul 30 20:41:46 2012
@@ -811,7 +811,6 @@ class OpenwireProtocolHandler extends Pr
     val consumer_sink = sink_manager.open()
     val credit_window_filter = new CreditWindowFilter[(Session[Delivery], 
Delivery)](consumer_sink.map { event =>
       val (session, delivery) = event
-      session_manager.delivered(session, delivery.size)
       val dispatch = new MessageDispatch
       dispatch.setConsumerId(info.getConsumerId)
       if( delivery.message eq EndOfBrowseMessage ) {
@@ -819,7 +818,7 @@ class OpenwireProtocolHandler extends Pr
         dispatch
       } else {
         var msg = delivery.message.asInstanceOf[OpenwireMessage].message
-        ack_handler.track(msg.getMessageId, delivery.ack)
+        ack_handler.track(msg.getMessageId, delivery.ack, session, 
delivery.size)
         dispatch.setDestination(msg.getDestination)
         dispatch.setMessage(msg)
       }
@@ -1015,7 +1014,7 @@ class OpenwireProtocolHandler extends Pr
 
     def connect(p:DeliveryProducer) = new OpenwireConsumerSession(p)
 
-    class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
+    class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit, val 
session:Session[Delivery], val size:Int) {
       var credited = false
     }
 
@@ -1041,7 +1040,7 @@ class OpenwireProtocolHandler extends Pr
         consumer_acks = null
       }
 
-      def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit) = {
+      def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit, 
session:Session[Delivery], size:Int) = {
         queue.assertExecuting()
         if( consumer_acks==null ) {
           // It can happen if we get closed.. but destination is still sending 
data..
@@ -1049,7 +1048,7 @@ class OpenwireProtocolHandler extends Pr
             ack(Undelivered, null)
           }
         } else {
-          consumer_acks += msgid -> new TrackedAck(ack)
+          consumer_acks += msgid -> new TrackedAck(ack, session, size)
         }
       }
 
@@ -1059,6 +1058,7 @@ class OpenwireProtocolHandler extends Pr
         if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
           for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
             if ( !delivery.credited ) {
+              session_manager.delivered(delivery.session, delivery.size)
               ack_source.merge(1)
               delivery.credited = true;
             }
@@ -1077,6 +1077,7 @@ class OpenwireProtocolHandler extends Pr
           for( (id, delivery) <- acked ) {
             // only credit once...
             if( !delivery.credited ) {
+              session_manager.delivered(delivery.session, delivery.size)
               ack_source.merge(1)
               delivery.credited = true;
             }


Reply via email to