Author: chirino
Date: Wed May 29 23:19:13 2013
New Revision: 1487677
URL: http://svn.apache.org/r1487677
Log:
Optimize the range ack case a bit when the first message is being acked. This
should lower the CPU usage as we don't need to iterate all the messages sent to
the consumer.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1487677&r1=1487676&r2=1487677&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Wed May 29 23:19:13 2013
@@ -1101,14 +1101,20 @@ class OpenwireProtocolHandler extends Pr
}
}
} else {
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( id == msgid ) {
- found = true
- true
- } else {
- !found
+
+ val acked = if( !consumer_acks.isEmpty &&
consumer_acks.headOption.get._1 == msgid ) {
+ Seq(consumer_acks.headOption.get)
+ } else {
+ var found = false
+ val (acked, _) = consumer_acks.partition{ case (id, ack)=>
+ if( id == msgid ) {
+ found = true
+ true
+ } else {
+ !found
+ }
}
+ acked
}
for( (id, delivery) <- acked ) {
@@ -1150,25 +1156,29 @@ class OpenwireProtocolHandler extends Pr
}
}
} else {
- // session acks ack all previously received messages..
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( id == msgid ) {
- found = true
- true
- } else {
- !found
+ val acked = if( !consumer_acks.isEmpty &&
consumer_acks.headOption.get._1 == msgid ) {
+ Seq(consumer_acks.remove(0))
+ } else {
+ // session acks ack all previously received messages..
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( id == msgid ) {
+ found = true
+ true
+ } else {
+ !found
+ }
+ }
+ if( !found ) {
+ trace("%s: ACK failed, invalid message id: %s, dest:
%s".format(security_context.remote_address, msgid, addresses.mkString(",")))
}
+ consumer_acks = not_acked
+ acked
}
- if( !found ) {
- trace("%s: ACK failed, invalid message id: %s, dest:
%s".format(security_context.remote_address, msgid, addresses.mkString(",")))
- } else {
- consumer_acks = not_acked
- acked.foreach{case (id, delivery)=>
- if( delivery.ack!=null ) {
- delivery.ack(consumed, uow)
- }
+ acked.foreach{case (id, delivery)=>
+ if( delivery.ack!=null ) {
+ delivery.ack(consumed, uow)
}
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1487677&r1=1487676&r2=1487677&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed May 29 23:19:13 2013
@@ -16,30 +16,28 @@
*/
package org.apache.activemq.apollo.stomp
-import _root_.org.fusesource.hawtbuf._
+import org.fusesource.hawtbuf._
+import collection.mutable.{ListBuffer, HashMap}
import dto.{StompConnectionStatusDTO, StompDTO}
-import org.fusesource.hawtdispatch._
-
-import org.apache.activemq.apollo.broker._
-import Buffer._
+import java.io.IOException
import java.lang.String
-import protocol.{ProtocolFilter3, ProtocolHandler}
-import security.SecurityContext
-import Stomp._
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.broker.store._
-import org.apache.activemq.apollo.util._
+import java.util
import java.util.concurrent.TimeUnit
-import java.util.Map.Entry
-import collection.mutable.{ListBuffer, HashMap}
-import java.io.IOException
+import language.implicitConversions
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.util.path.LiteralPart
+import org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch.transport.HeartBeatMonitor
-import path.{LiteralPart, Path, PathParser}
-import scala.Some
-import org.apache.activemq.apollo.broker.SubscriptionAddress
-import java.util
+import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import org.apache.activemq.apollo.broker.protocol.{ProtocolFilter3,
ProtocolHandler}
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import org.fusesource.hawtbuf.Buffer._
+import Stomp._
case class RichBuffer(self:Buffer) extends Proxy {
@@ -51,7 +49,6 @@ case class RichBuffer(self:Buffer) exten
}
}
-import language.implicitConversions
object BufferSupport {
implicit def to_rich_buffer(value:Buffer):RichBuffer = RichBuffer(value)
}
@@ -153,6 +150,7 @@ object StompProtocolHandler extends Log
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class StompProtocolHandler extends ProtocolHandler {
+
import StompProtocolHandler._
var connection_log:Log = StompProtocolHandler
@@ -363,14 +361,20 @@ class StompProtocolHandler extends Proto
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
queue.assertExecuting()
if( initial_credit_window.auto_credit ) {
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( id == msgid ) {
- found = true
- true
- } else {
- !found
+
+ val acked = if( !consumer_acks.isEmpty &&
consumer_acks.headOption.get._1 == msgid ) {
+ Seq(consumer_acks.headOption.get)
+ } else {
+ var found = false
+ val (acked, _) = consumer_acks.partition{ case (id, ack)=>
+ if( id == msgid ) {
+ found = true
+ true
+ } else {
+ !found
+ }
}
+ acked
}
for( (id, delivery) <- acked ) {
@@ -391,25 +395,29 @@ class StompProtocolHandler extends Proto
queue.assertExecuting()
assert(consumer_acks !=null)
- // session acks ack all previously received messages..
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( id == msgid ) {
- found = true
- true
- } else {
- !found
+ val acked = if( !consumer_acks.isEmpty &&
consumer_acks.headOption.get._1 == msgid ) {
+ Seq(consumer_acks.remove(0))
+ } else {
+ // session acks ack all previously received messages..
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( id == msgid ) {
+ found = true
+ true
+ } else {
+ !found
+ }
+ }
+ if( !found ) {
+ trace("%s: ACK failed, invalid message id: %s, dest:
%s".format(security_context.remote_address, msgid, addresses.mkString(",")))
}
+ consumer_acks = not_acked
+ acked
}
- if( !found ) {
- trace("%s: ACK failed, invalid message id: %s, dest:
%s".format(security_context.remote_address, msgid, addresses.mkString(",")))
- } else {
- consumer_acks = not_acked
- acked.foreach{case (id, delivery)=>
- if( delivery.ack!=null ) {
- delivery.ack(consumed, uow)
- }
+ acked.foreach{case (id, delivery)=>
+ if( delivery.ack!=null ) {
+ delivery.ack(consumed, uow)
}
}
@@ -751,6 +759,7 @@ class StompProtocolHandler extends Proto
override def set_connection(connection: BrokerConnection) = {
super.set_connection(connection)
+ import OptionSupport._
import collection.JavaConversions._
codec = connection.protocol_codec(classOf[StompCodec])
@@ -759,7 +768,6 @@ class StompProtocolHandler extends Proto
protocol_filters =
ProtocolFilter3.create_filters(config.protocol_filters.toList, this)
- import OptionSupport._
Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach(
codec.max_data_length = _ )
Option(config.max_header_length).map(MemoryPropertyEditor.parse(_).toInt).foreach(
codec.max_header_length = _ )
config.max_headers.foreach( codec.max_headers = _ )