Author: chirino
Date: Wed Apr 13 17:19:49 2011
New Revision: 1091853
URL: http://svn.apache.org/viewvc?rev=1091853&view=rev
Log:
Better dispatch execution assertion checking to catch usage errors.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1091853&r1=1091852&r2=1091853&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Apr 13 17:19:49 2011
@@ -674,7 +674,7 @@ class QueueEntry(val queue:Queue, val se
* Dispatches this entry to the consumers and continues dispatching
subsequent
* entries as long as the dispatch results in advancing in their dispatch
position.
*/
- def run() = {
+ def run() = queue.dispatch_queue {
var next = this;
while( next!=null && next.dispatch) {
next = next.getNext
@@ -1018,6 +1018,8 @@ class QueueEntry(val queue:Queue, val se
override def dispatch():Boolean = {
+ queue.assert_executing
+
// Nothing to dispatch if we don't have subs..
if( parked.isEmpty ) {
return false
@@ -1328,7 +1330,7 @@ object Subscription extends Log
* tracks the entries which the consumer has acquired.
*
*/
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends
DeliveryProducer {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends
DeliveryProducer with Dispatched {
import Subscription._
def dispatch_queue = queue.dispatch_queue
@@ -1501,6 +1503,11 @@ class Subscription(val queue:Queue, val
acquired_size += entry.size
def ack(sb:StoreUOW):Unit = {
+ assert_executing
+ if(!isLinked) {
+ warn("Internal protocol error: message delivery acked/nacked multiple
times: "+entry.seq)
+ return
+ }
// The session may have already been closed..
if( session == null ) {
return;
@@ -1539,6 +1546,11 @@ class Subscription(val queue:Queue, val
}
def nack:Unit = {
+ assert_executing
+ if(!isLinked) {
+ warn("Internal protocol error: message delivery acked/nacked multiple
times: "+entry.seq)
+ return
+ }
// The session may have already been closed..
if( session == null ) {
return;
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1091853&r1=1091852&r2=1091853&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Wed Apr 13 17:19:49 2011
@@ -28,7 +28,7 @@ import org.fusesource.hawtdispatch._
trait Dispatched {
def dispatch_queue:DispatchQueue
- protected def assert_executing = assert( dispatch_queue.isExecuting,
+ def assert_executing = assert( dispatch_queue.isExecuting,
"Dispatch queue '%s' was not executing, (currently executing: %s)".format(
Option(dispatch_queue.getLabel).getOrElse(""),
Option(getCurrentQueue).map(_.getLabel).getOrElse("None") )