Author: chirino
Date: Tue Mar 29 15:06:07 2011
New Revision: 1086622
URL: http://svn.apache.org/viewvc?rev=1086622&view=rev
Log:
- rename host to virtual_host
- added a Dispatched interface
- replaced the MapSink object with a map method on the sink class.
- Added SinkFilter and MutableSink helper classes.
Added:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Tue Mar 29 15:06:07 2011
@@ -32,7 +32,10 @@ import java.util.concurrent.TimeUnit
trait DomainDestination {
def id:Long
- def name:String
+ def virtual_host:VirtualHost
+
+ def destination_dto:DestinationDTO
+ def name:String = destination_dto.name
def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
def unbind (consumer:DeliveryConsumer, persistent:Boolean)
@@ -91,14 +94,14 @@ object LocalRouter extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class LocalRouter(val host:VirtualHost) extends BaseService with Router {
+class LocalRouter(val virtual_host:VirtualHost) extends BaseService with
Router with Dispatched {
import LocalRouter._
- def dispatch_queue:DispatchQueue = host.dispatch_queue
+ def dispatch_queue:DispatchQueue = virtual_host.dispatch_queue
def auto_create_destinations = {
import OptionSupport._
- host.config.auto_create_destinations.getOrElse(true)
+ virtual_host.config.auto_create_destinations.getOrElse(true)
}
private val ALL = new Path({
@@ -300,14 +303,14 @@ class LocalRouter(val host:VirtualHost)
import collection.JavaConversions._
import DestinationParser.default._
import AsciiBuffer._
- host.config.topics.find( x=> parseFilter(ascii(x.name)).matches(name)
).getOrElse(new TopicDTO)
+ virtual_host.config.topics.find( x=>
parseFilter(ascii(x.name)).matches(name) ).getOrElse(new TopicDTO)
}
def can_create_ds(config:DurableSubscriptionDTO, security:SecurityContext)
= {
- if( host.authorizer==null || security==null) {
+ if( virtual_host.authorizer==null || security==null) {
true
} else {
- host.authorizer.can_create(security, host, config)
+ virtual_host.authorizer.can_create(security, virtual_host, config)
}
}
@@ -344,27 +347,27 @@ class LocalRouter(val host:VirtualHost)
// A new destination is being created...
val dto = topic_config(path)
- if( host.authorizer!=null && security!=null &&
!host.authorizer.can_create(security, host, dto)) {
+ if( virtual_host.authorizer!=null && security!=null &&
!virtual_host.authorizer.can_create(security, virtual_host, dto)) {
return new Failure("Not authorized to create the destination")
}
val id = topic_id_counter.incrementAndGet
- val topic = new Topic(LocalRouter.this,
DestinationParser.encode_path(path), dto, id)
+ val topic = new Topic(LocalRouter.this,
destination.asInstanceOf[TopicDestinationDTO], dto, id)
add_destination(path, topic)
Success(topic)
}
def can_bind_one(path:Path, destination:DestinationDTO,
consumer:DeliveryConsumer, security:SecurityContext):Boolean = {
val config = topic_config(path)
- val authorizer = host.authorizer
- if( authorizer!=null && security!=null &&
!authorizer.can_receive_from(security, host, config) ) {
+ val authorizer = virtual_host.authorizer
+ if( authorizer!=null && security!=null &&
!authorizer.can_receive_from(security, virtual_host, config) ) {
return false;
}
destination match {
case destination:DurableSubscriptionDestinationDTO=>
// So the user can subscribe to the topic.. but can he create
durable sub??
- val qc =
DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
+ val qc =
DurableSubscriptionQueueBinding.create(destination).config(virtual_host).asInstanceOf[DurableSubscriptionDTO]
if( !can_create_ds(qc, security) ) {
return false;
}
@@ -375,8 +378,8 @@ class LocalRouter(val host:VirtualHost)
def can_connect_one(path:Path, destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
val config = topic_config(path)
- val authorizer = host.authorizer
- !(authorizer!=null && security!=null &&
!authorizer.can_send_to(security, host, config) )
+ val authorizer = virtual_host.authorizer
+ !(authorizer!=null && security!=null &&
!authorizer.can_send_to(security, virtual_host, config) )
}
}
@@ -385,10 +388,10 @@ class LocalRouter(val host:VirtualHost)
class QueueDomain extends Domain[Queue] {
def can_create_queue(config:QueueDTO, security:SecurityContext) = {
- if( host.authorizer==null || security==null) {
+ if( virtual_host.authorizer==null || security==null) {
true
} else {
- host.authorizer.can_create(security, host, config)
+ virtual_host.authorizer.can_create(security, virtual_host, config)
}
}
@@ -423,7 +426,7 @@ class LocalRouter(val host:VirtualHost)
dto.name = DestinationParser.encode_path(path)
val binding = QueueDomainQueueBinding.create(dto)
- val config = binding.config(host)
+ val config = binding.config(virtual_host)
if( can_create_queue(config, security) ) {
Success(_create_queue(binding))
} else {
@@ -434,14 +437,14 @@ class LocalRouter(val host:VirtualHost)
def can_bind_one(path:Path, dto:DestinationDTO, consumer:DeliveryConsumer,
security: SecurityContext):Boolean = {
val binding = QueueDomainQueueBinding.create(dto)
- val config = binding.config(host)
- if( host.authorizer!=null && security!=null ) {
+ val config = binding.config(virtual_host)
+ if( virtual_host.authorizer!=null && security!=null ) {
if( consumer.browser ) {
- if( !host.authorizer.can_receive_from(security, host, config) ) {
+ if( !virtual_host.authorizer.can_receive_from(security,
virtual_host, config) ) {
return false;
}
} else {
- if( !host.authorizer.can_consume_from(security, host, config) ) {
+ if( !virtual_host.authorizer.can_consume_from(security,
virtual_host, config) ) {
return false
}
}
@@ -451,9 +454,9 @@ class LocalRouter(val host:VirtualHost)
def can_connect_one(path:Path, dto:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
val binding = QueueDomainQueueBinding.create(dto)
- val config = binding.config(host)
- val authorizer = host.authorizer
- !( authorizer!=null && security!=null &&
!authorizer.can_send_to(security, host, config) )
+ val config = binding.config(virtual_host)
+ val authorizer = virtual_host.authorizer
+ !( authorizer!=null && security!=null &&
!authorizer.can_send_to(security, virtual_host, config) )
}
}
@@ -465,21 +468,21 @@ class LocalRouter(val host:VirtualHost)
/////////////////////////////////////////////////////////////////////////////
protected def _start(on_completed: Runnable) = {
- val tracker = new LoggingTracker("router startup", host.console_log,
dispatch_queue)
- if( host.store!=null ) {
+ val tracker = new LoggingTracker("router startup",
virtual_host.console_log, dispatch_queue)
+ if( virtual_host.store!=null ) {
val task = tracker.task("list_queues")
- host.store.list_queues { queue_keys =>
+ virtual_host.store.list_queues { queue_keys =>
for( queue_key <- queue_keys) {
val task = tracker.task("load queue: "+queue_key)
// Use a global queue to so we concurrently restore
// the queues.
globalQueue {
- host.store.get_queue(queue_key) { x =>
+ virtual_host.store.get_queue(queue_key) { x =>
x match {
case Some(record)=>
if( record.binding_kind == TempQueueBinding.TEMP_KIND ) {
// Drop temp queues on restart..
- host.store.remove_queue(queue_key){x=> task.run}
+ virtual_host.store.remove_queue(queue_key){x=> task.run}
} else {
dispatch_queue {
_create_queue(QueueBinding.create(record.binding_kind,
record.binding_data), queue_key)
@@ -496,7 +499,7 @@ class LocalRouter(val host:VirtualHost)
}
import OptionSupport._
- if(host.config.regroup_connections.getOrElse(false)) {
+ if(virtual_host.config.regroup_connections.getOrElse(false)) {
schedule_connection_regroup
}
@@ -504,7 +507,7 @@ class LocalRouter(val host:VirtualHost)
}
protected def _stop(on_completed: Runnable) = {
- val tracker = new LoggingTracker("router shutdown", host.console_log,
dispatch_queue)
+ val tracker = new LoggingTracker("router shutdown",
virtual_host.console_log, dispatch_queue)
queues_by_id.valuesIterator.foreach { queue=>
tracker.stop(queue)
}
@@ -568,7 +571,6 @@ class LocalRouter(val host:VirtualHost)
def domain(destination: DestinationDTO):Domain[_ <: DomainDestination] =
destination match {
case x:TopicDestinationDTO => topic_domain
- case x:DurableSubscriptionDestinationDTO => topic_domain
case x:QueueDestinationDTO => queue_domain
case _ => throw new RuntimeException("Unknown domain type:
"+destination.getClass)
}
@@ -673,10 +675,10 @@ class LocalRouter(val host:VirtualHost)
var qid = id
if( qid == -1 ) {
- qid = host.queue_id_counter.incrementAndGet
+ qid = virtual_host.queue_id_counter.incrementAndGet
}
- val config = binding.config(host)
+ val config = binding.config(virtual_host)
val queue = new Queue(this, qid, binding, config)
if( queue.tune_persistent && id == -1 ) {
@@ -686,7 +688,7 @@ class LocalRouter(val host:VirtualHost)
record.binding_data = binding.binding_data
record.binding_kind = binding.binding_kind
- host.store.add_queue(record) { rc => Unit }
+ virtual_host.store.add_queue(record) { rc => Unit }
}
@@ -731,7 +733,7 @@ class LocalRouter(val host:VirtualHost)
def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch,
String] = {
if( security!=null && queue.config.acl!=null ) {
- if( !host.authorizer.can_destroy(security, host, queue.config) ) {
+ if( !virtual_host.authorizer.can_destroy(security, virtual_host,
queue.config) ) {
return Failure("Not authorized to destroy")
}
}
@@ -742,7 +744,7 @@ class LocalRouter(val host:VirtualHost)
queue.stop
if( queue.tune_persistent ) {
queue.dispatch_queue {
- host.store.remove_queue(queue.id){x=> Unit}
+ virtual_host.store.remove_queue(queue.id){x=> Unit}
}
}
Success(Zilch)
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Mar 29 15:06:07 2011
@@ -44,9 +44,9 @@ import Queue._
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val router: LocalRouter, val id:Long, val binding:QueueBinding,
var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with
DeliveryConsumer with BaseService with DomainDestination {
+class Queue(val router: LocalRouter, val id:Long, val binding:QueueBinding,
var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with
DeliveryConsumer with BaseService with DomainDestination with Dispatched {
- def host = router.host
+ def virtual_host = router.virtual_host
var inbound_sessions = Set[DeliverySession]()
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
@@ -55,12 +55,15 @@ class Queue(val router: LocalRouter, val
val filter = binding.message_filter
override val dispatch_queue: DispatchQueue = createQueue(binding.label);
- host.broker.init_dispatch_queue(dispatch_queue)
+ virtual_host.broker.init_dispatch_queue(dispatch_queue)
+
+ def destination_dto: DestinationDTO = binding.binding_dto
dispatch_queue {
debug("created queue for: " + binding.label)
}
+
override def dispose: Unit = {
ack_source.cancel
}
@@ -123,7 +126,7 @@ class Queue(val router: LocalRouter, val
def configure(c:QueueDTO) = {
config = c
- tune_persistent = host.store !=null && config.persistent.getOrElse(true)
+ tune_persistent = virtual_host.store !=null &&
config.persistent.getOrElse(true)
tune_swap = tune_persistent && config.swap.getOrElse(true)
tune_swap_range_size = config.swap_range_size.getOrElse(10000)
tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024)
@@ -187,7 +190,7 @@ class Queue(val router: LocalRouter, val
if( tune_persistent ) {
- host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
+ virtual_host.store.list_queue_entry_ranges(id, tune_swap_range_size) {
ranges=>
dispatch_queue {
if( ranges!=null && !ranges.isEmpty ) {
@@ -502,13 +505,13 @@ class Queue(val router: LocalRouter, val
def connected() = {}
def bind(value: DeliveryConsumer, security:SecurityContext): Result[Zilch,
String] = {
- if( host.authorizer!=null && security!=null ) {
+ if( virtual_host.authorizer!=null && security!=null ) {
if( value.browser ) {
- if( !host.authorizer.can_receive_from(security, host, config) ) {
+ if( !virtual_host.authorizer.can_receive_from(security, virtual_host,
config) ) {
return new Failure("Not authorized to browse the queue")
}
} else {
- if( !host.authorizer.can_consume_from(security, host, config) ) {
+ if( !virtual_host.authorizer.can_consume_from(security, virtual_host,
config) ) {
return new Failure("Not authorized to consume from the queue")
}
}
@@ -561,8 +564,6 @@ class Queue(val router: LocalRouter, val
producer.unbind(this::Nil)
}
- def name: String = binding.label
-
override def connection:Option[BrokerConnection] = None
@@ -946,7 +947,7 @@ class QueueEntry(val queue:Queue, val se
// Are swapping out a non-persistent message?
if( delivery.storeKey == -1 ) {
- delivery.uow = queue.host.store.create_uow
+ delivery.uow = queue.virtual_host.store.create_uow
val uow = delivery.uow
delivery.storeKey = uow.store(delivery.createMessageRecord)
store
@@ -959,7 +960,7 @@ class QueueEntry(val queue:Queue, val se
} else {
if( asap ) {
- queue.host.store.flush_message(message_key) {
+ queue.virtual_host.store.flush_message(message_key) {
queue.swap_out_completes_source.merge(this)
}
}
@@ -1138,7 +1139,7 @@ class QueueEntry(val queue:Queue, val se
// start swapping in...
swapping_in = true
queue.swapping_in_size += size
- queue.host.store.load_message(message_key) { delivery =>
+ queue.virtual_host.store.load_message(message_key) { delivery =>
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
@@ -1242,7 +1243,7 @@ class QueueEntry(val queue:Queue, val se
override def swap_in() = {
if( !swapping_in ) {
swapping_in = true
- queue.host.store.list_queue_entries(queue.id, seq, last) { records =>
+ queue.virtual_host.store.list_queue_entries(queue.id, seq, last) {
records =>
if( !records.isEmpty ) {
queue.dispatch_queue {
@@ -1501,7 +1502,7 @@ class Subscription(val queue:Queue, val
total_ack_count += 1
if (entry.messageKey != -1) {
val storeBatch = if( sb == null ) {
- queue.host.store.create_uow
+ queue.virtual_host.store.create_uow
} else {
sb
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Tue Mar 29 15:06:07 2011
@@ -31,7 +31,7 @@ import ReporterLevel._
*/
trait Router extends Service {
- def host:VirtualHost
+ def virtual_host:VirtualHost
def get_queue(dto:Long):Option[Queue] @suspendable
@@ -197,7 +197,7 @@ abstract class DeliveryProducerRoute(val
if( copy.storeKey == -1L && target.consumer.is_persistent &&
copy.message.persistent ) {
if( copy.uow==null ) {
- copy.uow = router.host.store.create_uow
+ copy.uow = router.virtual_host.store.create_uow
} else {
copy.uow.retain
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Tue Mar 29 15:06:07 2011
@@ -49,24 +49,44 @@ trait Sink[T] {
* Sets a refiller on the sink. The refiller is executed
* when the sink is interested in receiving more deliveries.
*/
- var refiller:Runnable
+ def refiller:Runnable
+ def refiller_=(value:Runnable)
+ def map[Y](func: Y=>T ):Sink[Y] = {
+ def outer = Sink.this
+ new Sink[Y]() with SinkFilter {
+ def downstream = outer
+ def offer(value:Y) = {
+ if( full ) {
+ false
+ } else {
+ outer.offer(func(value))
+ }
+ }
+ }
+ }
}
+trait SinkFilter {
+ def downstream:Sink[_]
+ def refiller:Runnable = downstream.refiller
+ def refiller_=(value:Runnable) { downstream.refiller=value }
+ def full: Boolean = downstream.full
+}
/**
* <p>
* A delivery sink which is connected to a transport. It expects the caller's
dispatch
- * queue to be the same as the transport's/
+ * queue to be the same as the transport's
* <p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class TransportSink(val transport:Transport) extends Sink[AnyRef] {
+ var refiller:Runnable = NOOP
def full:Boolean = transport.full
def offer(value:AnyRef) = transport.offer(value)
- var refiller:Runnable = null
}
/**
@@ -78,23 +98,24 @@ class TransportSink(val transport:Transp
*/
class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
- private var overflow = new LinkedList[T]()
- var refiller: Runnable = null
+ var refiller:Runnable = NOOP
+
+ private var overflow = collection.mutable.Queue[T]()
def overflowed = !overflow.isEmpty
def full = overflowed || downstream.full
+ def clear = overflow.clear
+
downstream.refiller = ^{ drain }
protected def drain:Unit = {
while( overflowed ) {
- val delivery = overflow.removeFirst
- if( !downstream.offer(delivery) ) {
- overflow.addFirst(delivery)
+ if( !downstream.offer(overflow.front) ) {
return
} else {
- onDelivered(delivery)
+ onDelivered(overflow.dequeue)
}
}
// request a refill once the overflow is empty...
@@ -107,7 +128,7 @@ class OverflowSink[T](val downstream:Sin
*/
def offer(value:T) = {
if( overflowed || !downstream.offer(value)) {
- overflow.addLast(value)
+ overflow.enqueue(value)
} else {
onDelivered(value)
}
@@ -122,24 +143,37 @@ class OverflowSink[T](val downstream:Sin
}
-object MapSink {
- def apply[X,Y](downstream:Sink[X])(func: Y=>X ) = {
- new Sink[Y] {
- def refiller = downstream.refiller
- def refiller_=(value:Runnable) = downstream.refiller=value
- def full = downstream.full
- def offer(value:Y) = {
- if( full ) {
- false
- } else {
- downstream.offer(func(value))
- }
- }
+/**
+ * A sink that allows the downstream sink to set to an
+ * optional sink.
+ */
+class MutableSink[T] extends Sink[T] {
+
+ var refiller:Runnable = NOOP
+ private var _downstream:Option[Sink[T]] =_
+
+ def downstream_=(value: Option[Sink[T]]) {
+ _downstream.foreach(d => d.refiller = NOOP )
+ _downstream = value
+ _downstream.foreach{d =>
+ d.refiller = refiller
+ refiller.run
}
}
+
+ def downstream = _downstream
+
+ def full = _downstream.map(_.full).getOrElse(true)
+
+ /**
+ * @return true always even when full since those messages just get stored
in a
+ * overflow list
+ */
+ def offer(value:T) = ! _downstream.map(_.offer(value)).getOrElse(false)
}
+
/**
* <p>
* A SinkMux multiplexes access to a target sink so that multiple
@@ -156,8 +190,7 @@ class SinkMux[T](val downstream:Sink[T],
var sessions = HashSet[Session[T]]()
var session_max_credits = 1024*32;
- val overflow = new OverflowSink[(Session[T],T)](MapSink(downstream){_._2}) {
-
+ val overflow = new OverflowSink[(Session[T],T)](downstream.map(_._2)) {
// Once a value leaves the overflow, then we can credit the
// session so that more messages can be accepted.
override protected def onDelivered(event:(Session[T],T)) = {
@@ -223,6 +256,8 @@ class SinkMux[T](val downstream:Sink[T],
*/
class Session[T](val producer_queue:DispatchQueue, var credits:Int,
mux:SinkMux[T]) extends Sink[T] {
+ var refiller:Runnable = NOOP
+
private def session_max_credits = mux.session_max_credits
private def sizer = mux.sizer
private def downstream = mux.source
@@ -255,7 +290,6 @@ class Session[T](val producer_queue:Disp
// producer serial dispatch queue
///////////////////////////////////////////////////
- var refiller:Runnable = ^{}
override def full = {
assert(getCurrentQueue eq producer_queue)
@@ -311,11 +345,12 @@ trait Sizer[T] {
*/
class QueueSink[T](val sizer:Sizer[T], var maxSize:Int=1024*32) extends
Sink[T] {
+ var refiller:Runnable = NOOP
+
var buffer = new LinkedList[T]()
private var size = 0
var drainer: Runnable = null
- var refiller: Runnable = null
def full = size >= maxSize
def poll = buffer.poll
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Tue Mar 29 15:06:07 2011
@@ -30,7 +30,7 @@ import collection.mutable.{HashMap, List
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Topic(val router:LocalRouter, val name:String, val config:TopicDTO, val
id:Long) extends DomainDestination {
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO,
val config:TopicDTO, val id:Long) extends DomainDestination {
var producers = ListBuffer[BindableDeliveryProducer]()
var consumers = ListBuffer[DeliveryConsumer]()
@@ -39,6 +39,8 @@ class Topic(val router:LocalRouter, val
import OptionSupport._
+ def virtual_host: VirtualHost = router.virtual_host
+
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
def is_same_ds(sub1:DurableSubscriptionDestinationDTO,
sub2:DurableSubscriptionDestinationDTO) = {
@@ -55,6 +57,23 @@ class Topic(val router:LocalRouter, val
r.bind(list)
})
+ case destination:DurableSubscriptionDestinationDTO=>
+
+ val queue =
router.topic_domain.get_or_create_durable_subscription(destination)
+ if( !durable_subscriptions.contains(queue) ) {
+ durable_subscriptions += queue
+ val list = List(queue)
+ producers.foreach({ r=>
+ r.bind(list)
+ })
+ }
+
+ // Typically durable subs are only consumed by on connection at a
time. So collocate the
+ // queue onto the consumer's dispatch queue.
+ queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+ queue.bind(destination, consumer)
+ consumer_queues += consumer->queue
+
case destination:TopicDestinationDTO=>
var target = consumer
slow_consumer_policy match {
@@ -78,22 +97,6 @@ class Topic(val router:LocalRouter, val
r.bind(list)
})
- case destination:DurableSubscriptionDestinationDTO=>
-
- val queue =
router.topic_domain.get_or_create_durable_subscription(destination)
- if( !durable_subscriptions.contains(queue) ) {
- durable_subscriptions += queue
- val list = List(queue)
- producers.foreach({ r=>
- r.bind(list)
- })
- }
-
- // Typically durable subs are only consumed by on connection at a
time. So collocate the
- // queue onto the consumer's dispatch queue.
- queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
- queue.bind(destination, consumer)
- consumer_queues += consumer->queue
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Tue Mar 29 15:06:07 2011
@@ -400,7 +400,7 @@ class StompProtocolHandler extends Proto
override def on_transport_connected() = {
- session_manager = new SinkMux[StompFrame](
MapSink(connection.transport_sink){x=>
+ session_manager = new SinkMux[StompFrame]( connection.transport_sink.map
{x=>
trace("sending frame: %s", x)
x
}, dispatchQueue, StompFrame)
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Tue Mar 29 15:06:07 2011
@@ -23,100 +23,108 @@ import _root_.org.fusesource.hawtbuf.{By
import java.io._
import org.apache.activemq.apollo.broker.{KeyStorage, ProtocolException}
import javax.net.ssl.{SSLSocket, SSLContext}
+import org.scalatest.matchers.ShouldMatchers
/**
* A simple Stomp client used for testing purposes
*/
- class StompClient {
+class StompClient extends ShouldMatchers {
- var socket:Socket = new Socket
- var out:OutputStream = null
- var in:InputStream = null
- val bufferSize = 64*1204
- var key_storeage:KeyStorage=null
-
- def open(host: String, port: Int) = {
-
- socket = if( key_storeage!=null ) {
- val context = SSLContext.getInstance("TLS")
- context.init(key_storeage.create_key_managers,
key_storeage.create_trust_managers, null)
- context.getSocketFactory().createSocket()
- //
socket.asInstanceOf[SSLSocket].setEnabledCipherSuites(Array("SSL_RSA_WITH_RC4_128_MD5"))
- // socket
- } else {
- new Socket
+ var socket:Socket = new Socket
+ var out:OutputStream = null
+ var in:InputStream = null
+ val bufferSize = 64*1204
+ var key_storeage:KeyStorage=null
+
+ def open(host: String, port: Int) = {
+
+ socket = if( key_storeage!=null ) {
+ val context = SSLContext.getInstance("TLS")
+ context.init(key_storeage.create_key_managers,
key_storeage.create_trust_managers, null)
+ context.getSocketFactory().createSocket()
+ //
socket.asInstanceOf[SSLSocket].setEnabledCipherSuites(Array("SSL_RSA_WITH_RC4_128_MD5"))
+ // socket
+ } else {
+ new Socket
+ }
+ socket.connect(new InetSocketAddress(host, port))
+ socket.setSoLinger(true, 0)
+ out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
+ in = new BufferedInputStream(socket.getInputStream, bufferSize)
+ }
+
+ def close() = {
+ socket.close
+ }
+
+ def write(frame:String) = {
+ out.write(frame.getBytes("UTF-8"))
+ out.write(0)
+ out.write('\n')
+ out.flush
+ }
+
+ def write(frame:Array[Byte]) = {
+ out.write(frame)
+ out.write(0)
+ out.write('\n')
+ out.flush
+ }
+
+ def skip():Unit = {
+ var c = in.read
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return
}
- socket.connect(new InetSocketAddress(host, port))
- socket.setSoLinger(true, 0)
- out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
- in = new BufferedInputStream(socket.getInputStream, bufferSize)
+ c = in.read()
}
+ throw new EOFException()
+ }
- def close() = {
- socket.close
- }
-
- def write(frame:String) = {
- out.write(frame.getBytes("UTF-8"))
- out.write(0)
- out.write('\n')
- out.flush
- }
-
- def write(frame:Array[Byte]) = {
- out.write(frame)
- out.write(0)
- out.write('\n')
- out.flush
- }
-
- def skip():Unit = {
- var c = in.read
- while( c >= 0 ) {
- if( c==0 ) {
- return
- }
- c = in.read()
+ def receive():String = {
+ var start = true;
+ val buffer = new BAOS()
+ var c = in.read
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return new String(buffer.toByteArray, "UTF-8")
}
- throw new EOFException()
- }
-
- def receive():String = {
- var start = true;
- val buffer = new BAOS()
- var c = in.read
- while( c >= 0 ) {
- if( c==0 ) {
- return new String(buffer.toByteArray, "UTF-8")
- }
- if( !start || c!= Stomp.NEWLINE) {
- start = false
- buffer.write(c)
- }
- c = in.read()
+ if( !start || c!= Stomp.NEWLINE) {
+ start = false
+ buffer.write(c)
}
- throw new EOFException()
+ c = in.read()
}
+ throw new EOFException()
+ }
- def receiveAscii():AsciiBuffer = {
- val buffer = new BAOS()
- var c = in.read
- while( c >= 0 ) {
- if( c==0 ) {
- return buffer.toBuffer.ascii
- }
- buffer.write(c)
- c = in.read()
+ def wait_for_receipt(id:String): Unit = {
+ val frame = receive()
+ frame should startWith("RECEIPT\n")
+ frame should include("receipt-id:"+id+"\n")
+ }
+
+
+ def receiveAscii():AsciiBuffer = {
+ val buffer = new BAOS()
+ var c = in.read
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return buffer.toBuffer.ascii
}
- throw new EOFException()
+ buffer.write(c)
+ c = in.read()
}
+ throw new EOFException()
+ }
- def receive(expect:String):String = {
- val rc = receive()
- if( !rc.startsWith(expect) ) {
- throw new ProtocolException("Expected "+expect)
- }
- rc
+ def receive(expect:String):String = {
+ val rc = receive()
+ if( !rc.startsWith(expect) ) {
+ throw new ProtocolException("Expected "+expect)
}
+ rc
+ }
- }
\ No newline at end of file
+}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Tue Mar 29 15:06:07 2011
@@ -35,7 +35,7 @@ class StompRemoteConsumer extends Remote
var outboundSink: OverflowSink[StompFrame] = null
def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x =>
x})
+ outboundSink = new OverflowSink[StompFrame](transport_sink.map(x=>x))
outboundSink.refiller = ^ {}
val stompDestination = destination match {
@@ -145,7 +145,7 @@ class StompRemoteProducer extends Remote
}
override def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x =>
x})
+ outboundSink = new OverflowSink[StompFrame](transport_sink.map(x=>x))
outboundSink.refiller = ^ {drain}
stompDestination = destination match {
Added:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1086622&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Tue Mar 29 15:06:07 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+import org.fusesource.hawtdispatch._
+
+/**
+ * <p>
+ * Trait that exposes the {@link DispatchQueue} used to guard
+ * mutable access to the state of the object implementing this interface.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Dispatched {
+ def dispatch_queue:DispatchQueue
+
+ protected def assert_dispatched = assert( getCurrentQueue == dispatch_queue )
+}
\ No newline at end of file