Author: chirino
Date: Fri Mar 18 18:38:38 2011
New Revision: 1083007
URL: http://svn.apache.org/viewvc?rev=1083007&view=rev
Log:
Consolidating the can_connect / can_bind logic in the LocalRouter.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Fri Mar 18 18:38:38 2011
@@ -34,11 +34,9 @@ trait DomainDestination {
def id:Long
def name:String
- def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer,
security:SecurityContext):Boolean
def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
def unbind (consumer:DeliveryConsumer, persistent:Boolean)
- def can_connect(destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean
def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
def disconnect (producer:BindableDeliveryProducer)
@@ -142,12 +140,12 @@ class LocalRouter(val host:VirtualHost)
// binds any matching wild card subs and producers...
import JavaConversions._
consumers_by_path.get( path ).foreach { x=>
- if( dest.can_bind(x.destination, x.consumer, x.security) ) {
+ if( can_bind_one(path, x.destination, x.consumer, x.security) ) {
dest.bind(x.destination, x.consumer)
}
}
producers_by_path.get( path ).foreach { x=>
- if( dest.can_connect(x.destination, x.producer, x.security) ) {
+ if( can_connect_one(path, x.destination, x.producer, x.security) ) {
dest.connect(x.destination, x.producer)
}
}
@@ -157,8 +155,8 @@ class LocalRouter(val host:VirtualHost)
destination_by_path.remove(path, dest)
destination_by_id.remove(dest.id)
}
-
- def can_bind(path:Path, destination:DestinationDTO,
consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
+ def can_bind_one(path:Path, destination:DestinationDTO,
consumer:DeliveryConsumer, security:SecurityContext):Boolean
+ def can_bind_all(path:Path, destination:DestinationDTO,
consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
val wildcard = PathParser.containsWildCards(path)
var matches = get_destination_matches(path)
@@ -177,8 +175,8 @@ class LocalRouter(val host:VirtualHost)
}
matches.foreach { dest =>
- if( !dest.can_bind(destination, consumer, security) ) {
- return Failure("Not authorized to reveive from the destination.")
+ if( !can_bind_one(path, destination, consumer, security) ) {
+ return Failure("Not authorized to receive from the destination.")
}
}
}
@@ -188,7 +186,7 @@ class LocalRouter(val host:VirtualHost)
def bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer,
security:SecurityContext):Unit = {
var matches = get_destination_matches(path)
matches.foreach { dest=>
- if( dest.can_bind(destination, consumer, security) ) {
+ if( can_bind_one(path, destination, consumer, security) ) {
dest.bind(destination, consumer)
}
}
@@ -213,13 +211,22 @@ class LocalRouter(val host:VirtualHost)
}
- def can_connect(path:Path, destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Result[Zilch,
String] = {
+ def can_connect_one(path:Path, destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean
+
+ def can_connect_all(path:Path, destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Result[Zilch,
String] = {
val wildcard = PathParser.containsWildCards(path)
var matches = get_destination_matches(path)
- // Should we attempt to auto create the destination?
- if( !wildcard ) {
+ if( wildcard ) {
+
+ // Wild card sends never fail authorization... since a destination
+ // may get crated later which the user is authorized to use.
+ Success(Zilch)
+
+ } else {
+
+ // Should we attempt to auto create the destination?
if ( matches.isEmpty && auto_create_destinations ) {
val rc = create_destination(path, security)
if( rc.failed ) {
@@ -227,23 +234,24 @@ class LocalRouter(val host:VirtualHost)
}
matches = get_destination_matches(path)
}
+
if( matches.isEmpty ) {
return Failure("The destination does not exist.")
}
- matches.foreach { dest =>
- if( !dest.can_connect(destination, producer, security) ) {
- return Failure("Not authorized to send to the destination.")
- }
+ // since this is not a wild card, we should have only matched one..
+ assert( matches.size == 1 )
+ if( !can_connect_one(path, destination, producer, security) ) {
+ return Failure("Not authorized to send to the destination.")
}
- }
- Success(Zilch)
+ Success(Zilch)
+ }
}
def connect(path:Path, destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
get_destination_matches(path).foreach { dest=>
- if( dest.can_connect(destination, producer, security) ) {
+ if( can_connect_one(path, destination, producer, security) ) {
dest.connect(destination, producer)
}
}
@@ -262,29 +270,12 @@ class LocalRouter(val host:VirtualHost)
val topic_domain = new TopicDomain
class TopicDomain extends Domain[Topic] {
- val topic_id_counter = new LongCounter
+ val topic_id_counter = new LongCounter()
// Stores durable subscription queues.
val durable_subscriptions_by_path = new PathMap[Queue]()
val durable_subscriptions_by_id = HashMap[(String,String), Queue]()
-
- override def can_bind(path:Path, destination:DestinationDTO,
consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
- var rc = super.can_bind(path, destination, consumer, security)
- if( !rc.failed ) {
- destination match {
- case destination:DurableSubscriptionDestinationDTO=>
- // So the user can subscribe to the topic.. but can he create
durable sub??
- val qc =
DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
- if( !can_create_ds(qc, security) ) {
- return Failure("Not authorized to create the durable
subscription.")
- }
- case _ =>
- }
- }
- rc
- }
-
def
get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue
= {
val key = (destination.client_id, destination.subscription_id)
durable_subscriptions_by_id.get( key ).getOrElse {
@@ -363,6 +354,31 @@ class LocalRouter(val host:VirtualHost)
Success(topic)
}
+ def can_bind_one(path:Path, destination:DestinationDTO,
consumer:DeliveryConsumer, security:SecurityContext):Boolean = {
+ val config = topic_config(path)
+ val authorizer = host.authorizer
+ if( authorizer!=null && security!=null &&
!authorizer.can_receive_from(security, host, config) ) {
+ return false;
+ }
+
+ destination match {
+ case destination:DurableSubscriptionDestinationDTO=>
+ // So the user can subscribe to the topic.. but can he create
durable sub??
+ val qc =
DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
+ if( !can_create_ds(qc, security) ) {
+ return false;
+ }
+ case _ =>
+ }
+ true
+ }
+
+ def can_connect_one(path:Path, destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+ val config = topic_config(path)
+ val authorizer = host.authorizer
+ !(authorizer!=null && security!=null &&
!authorizer.can_send_to(security, host, config) )
+ }
+
}
val queue_domain = new QueueDomain
@@ -414,6 +430,31 @@ class LocalRouter(val host:VirtualHost)
}
}
+
+ def can_bind_one(path:Path, dto:DestinationDTO, consumer:DeliveryConsumer,
security: SecurityContext):Boolean = {
+ val binding = QueueDomainQueueBinding.create(dto)
+ val config = binding.config(host)
+ if( host.authorizer!=null && security!=null ) {
+ if( consumer.browser ) {
+ if( !host.authorizer.can_receive_from(security, host, config) ) {
+ return false;
+ }
+ } else {
+ if( !host.authorizer.can_consume_from(security, host, config) ) {
+ return false
+ }
+ }
+ }
+ return true;
+ }
+
+ def can_connect_one(path:Path, dto:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+ val binding = QueueDomainQueueBinding.create(dto)
+ val config = binding.config(host)
+ val authorizer = host.authorizer
+ !( authorizer!=null && security!=null &&
!authorizer.can_send_to(security, host, config) )
+ }
+
}
/////////////////////////////////////////////////////////////////////////////
@@ -524,7 +565,7 @@ class LocalRouter(val host:VirtualHost)
//
/////////////////////////////////////////////////////////////////////////////
- def domain(destination: DestinationDTO) = destination match {
+ def domain(destination: DestinationDTO):Domain[_ <: DomainDestination] =
destination match {
case x:TopicDestinationDTO => topic_domain
case x:DurableSubscriptionDestinationDTO => topic_domain
case x:QueueDestinationDTO => queue_domain
@@ -535,7 +576,7 @@ class LocalRouter(val host:VirtualHost)
consumer.retain
val paths = destination.map(x=> (DestinationParser.decode_path(x.name), x)
)
dispatch_queue ! {
- val failures = paths.map(x=> domain(x._2).can_bind(x._1, x._2, consumer,
security) ).flatMap( _.failure_option )
+ val failures = paths.map(x=> domain(x._2).can_bind_all(x._1, x._2,
consumer, security) ).flatMap( _.failure_option )
val rc = if( !failures.isEmpty ) {
Failure(failures.mkString("; "))
} else {
@@ -564,7 +605,7 @@ class LocalRouter(val host:VirtualHost)
val paths = destinations.map(x=> (DestinationParser.decode_path(x.name),
x) )
dispatch_queue ! {
- val failures = paths.map(x=> domain(x._2).can_connect(x._1, x._2,
producer, security) ).flatMap( _.failure_option )
+ val failures = paths.map(x=> domain(x._2).can_connect_all(x._1, x._2,
producer, security) ).flatMap( _.failure_option )
if( !failures.isEmpty ) {
producer.release
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Mar 18 18:38:38 2011
@@ -539,21 +539,6 @@ class Queue(val router: LocalRouter, val
def disconnected() = throw new RuntimeException("unsupported")
- def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer,
security: SecurityContext):Boolean = {
- if( host.authorizer!=null && security!=null ) {
- if( consumer.browser ) {
- if( !host.authorizer.can_receive_from(security, host, config) ) {
- return false;
- }
- } else {
- if( !host.authorizer.can_consume_from(security, host, config) ) {
- return false
- }
- }
- }
- return true;
- }
-
def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
bind(consumer::Nil)
}
@@ -561,15 +546,6 @@ class Queue(val router: LocalRouter, val
unbind(consumer::Nil)
}
- def can_connect(destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
- val authorizer = host.authorizer
- if( authorizer!=null && security!=null &&
!authorizer.can_send_to(security, host, config) ) {
- false
- } else {
- true
- }
- }
-
def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
= {
import OptionSupport._
if( config.unified.getOrElse(false) ) {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Mar 18 18:38:38 2011
@@ -41,15 +41,6 @@ class Topic(val router:LocalRouter, val
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
- def can_bind(destination: DestinationDTO, consumer:DeliveryConsumer,
security:SecurityContext) = {
- val authorizer = router.host.authorizer
- if( authorizer!=null && security!=null &&
!authorizer.can_receive_from(security, router.host, config) ) {
- false
- } else {
- true
- }
- }
-
def is_same_ds(sub1:DurableSubscriptionDestinationDTO,
sub2:DurableSubscriptionDestinationDTO) = {
(sub1.client_id, sub1.subscription_id) == (sub2.client_id,
sub2.subscription_id)
}
@@ -173,15 +164,6 @@ class Topic(val router:LocalRouter, val
}
}
- def can_connect(destination:DestinationDTO,
producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
- val authorizer = router.host.authorizer
- if( authorizer!=null && security!=null &&
!authorizer.can_send_to(security, router.host, config) ) {
- false
- } else {
- true
- }
- }
-
def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
= {
producers += producer
producer.bind(consumers.toList ::: durable_subscriptions.toList)
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Mar 18 18:38:38 2011
@@ -1255,7 +1255,7 @@ class StompSecurityTest extends StompTes
val frame = client.receive()
frame should startWith("ERROR\n")
- frame should include("message:Not authorized to reveive from the
destination.\n")
+ frame should include("message:Not authorized to receive from the
destination.\n")
}
// test("Consume authorized and JMSXUserID is set on message") {