Author: chirino
Date: Wed May 29 23:18:48 2013
New Revision: 1487674
URL: http://svn.apache.org/r1487674
Log:
Reduce the number of dispatch sources used by a queue.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1487674&r1=1487673&r2=1487674&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed May 29 23:18:48 2013
@@ -31,6 +31,7 @@ import org.apache.activemq.apollo.dto._
import java.util.regex.Pattern
import collection.mutable.ListBuffer
import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.{DeliveryResult, Subscription}
object Queue extends Log {
val subscription_counter = new AtomicInteger(0)
@@ -103,14 +104,6 @@ class Queue(val router: LocalRouter, val
debug("created queue: " + id)
- override def dispose: Unit = {
- ack_source.cancel
- }
-
- val ack_source = createSource(new
ListEventAggregator[(Subscription#AcquiredQueueEntry, DeliveryResult,
StoreUOW)](), dispatch_queue)
- ack_source.setEventHandler(^ {drain_acks});
- ack_source.resume
-
val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue,
Delivery, Integer.MAX_VALUE, 1024*640) {
override def time_stamp = now
}
@@ -217,9 +210,17 @@ class Queue(val router: LocalRouter, val
var individual_swapped_items = 0
- val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
- swap_source.setEventHandler(^{ swap_messages });
- swap_source.resume
+ var swap_triggered = false
+ def trigger_swap = {
+ dispatch_queue.assertExecuting()
+ if( tune_swap && !swap_triggered ) {
+ swap_triggered = true
+ defer {
+ swap_triggered = false
+ swap_messages
+ }
+ }
+ }
var restored_from_store = false
@@ -825,12 +826,6 @@ class Queue(val router: LocalRouter, val
}
}
- def trigger_swap = {
- if( tune_swap ) {
- swap_source.merge(1)
- }
- }
-
var keep_up_delivery_rate = 0L
def swap_messages:Unit = {
@@ -1168,40 +1163,38 @@ class Queue(val router: LocalRouter, val
}
}
- def drain_acks = might_unfill {
- val end = System.nanoTime()
- ack_source.getData.foreach {
- case (entry, consumed, uow) =>
- consumed match {
- case Consumed =>
- entry.ack(uow)
- case Expired=>
- val actual = create_uow(uow)
- expired(actual, entry.entry) {
- entry.ack(actual)
+ def process_ack(entry:Subscription#AcquiredQueueEntry,
consumed:DeliveryResult, uow:StoreUOW) = defer {
+ might_unfill {
+ consumed match {
+ case Consumed =>
+ entry.ack(uow)
+ case Expired=>
+ val actual = create_uow(uow)
+ expired(actual, entry.entry) {
+ entry.ack(actual)
+ }
+ actual.release
+ case Delivered =>
+ entry.increment_nack
+ entry.entry.redelivered
+ entry.nack
+ case Undelivered =>
+ entry.nack
+ case Poisoned =>
+ entry.increment_nack
+ entry.entry.redelivered
+ var limit = dlq_nak_limit
+ if( limit>0 && entry.entry.redelivery_count >= limit ) {
+ dead_letter(uow, entry.entry) { uow =>
+ entry.remove(uow)
}
- actual.release
- case Delivered =>
- entry.increment_nack
- entry.entry.redelivered
- entry.nack
- case Undelivered =>
+ } else {
entry.nack
- case Poisoned =>
- entry.increment_nack
- entry.entry.redelivered
- var limit = dlq_nak_limit
- if( limit>0 && entry.entry.redelivery_count >= limit ) {
- dead_letter(uow, entry.entry) { uow =>
- entry.remove(uow)
- }
- } else {
- entry.nack
- }
- }
- if( uow!=null ) {
- uow.release
- }
+ }
+ }
+ if( uow!=null ) {
+ uow.release
+ }
}
}
@@ -1368,35 +1361,6 @@ class Queue(val router: LocalRouter, val
rc
}
- val swap_out_completes_source = createSource(new
ListEventAggregator[Task](), dispatch_queue)
- swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
- swap_out_completes_source.resume
-
- def drain_swap_out_completes() = might_unfill {
- val data = swap_out_completes_source.getData
- data.foreach { loaded =>
- loaded.run()
- }
- }
-
- val store_load_source = createSource(new
ListEventAggregator[(QueueEntry#Swapped, MessageRecord)](), dispatch_queue)
- store_load_source.setEventHandler(^ {drain_store_loads});
- store_load_source.resume
-
-
- def drain_store_loads() = {
- val data = store_load_source.getData
- data.foreach { case (swapped,message_record) =>
- swapped.swapped_in(message_record)
- }
-
- data.foreach { case (swapped,_) =>
- if( swapped.entry.hasSubs ) {
- swapped.entry.task.run
- }
- }
- }
-
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1487674&r1=1487673&r2=1487674&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Wed May 29 23:18:48 2013
@@ -448,13 +448,13 @@ class QueueEntry(val queue:Queue, val se
uow.enqueue(toQueueEntryRecord)
queue.swapping_out_size+=size
uow.on_flush { canceled =>
- queue.swap_out_completes_source.merge(^{
+ queue.defer {
this.swapped_out(!canceled)
queue.swapping_out_size-=size
if( queue.swapping_out_size==0 ) {
queue.on_queue_flushed
}
- })
+ }
}
}
}
@@ -671,7 +671,7 @@ class QueueEntry(val queue:Queue, val se
if( uow!=null ) {
uow.retain
}
- queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
+ queue.process_ack(acquiredQueueEntry, consumed, uow)
}
val accepted = sub.offer(acquiredDelivery)
@@ -760,7 +760,12 @@ class QueueEntry(val queue:Queue, val se
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
- queue.store_load_source.merge((this, delivery.get))
+ queue.defer {
+ swapped_in(delivery.get)
+ if( entry.hasSubs ) {
+ entry.task.run
+ }
+ }
} else {
warn("Queue '%s' detected store dropped message at seq: %d",
queue.id, seq)