Author: chirino
Date: Mon Aug 15 06:09:16 2011
New Revision: 1157716
URL: http://svn.apache.org/viewvc?rev=1157716&view=rev
Log:
Better deal with the case where a connection tries to use a virtual host which
is stopped.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Mon Aug 15 06:09:16 2011
@@ -875,20 +875,24 @@ class LocalRouter(val virtual_host:Virtu
}
def bind(destination: Array[DestinationDTO], consumer: DeliveryConsumer,
security: SecurityContext) = {
- consumer.retain
- val paths = destination.map(x=> (destination_parser.decode_path(x.path),
x) )
- dispatch_queue ! {
- val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2,
consumer, security) )
- val rc = if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
- } else {
- paths.foreach { x=>
- domain(x._2).bind(x._1, x._2, consumer, security)
+ if(!virtual_host.service_state.is_started) {
+ Some("virtual host stopped.")
+ } else {
+ consumer.retain
+ val paths = destination.map(x=> (destination_parser.decode_path(x.path),
x) )
+ dispatch_queue ! {
+ val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2,
consumer, security) )
+ val rc = if( !failures.isEmpty ) {
+ Some(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).bind(x._1, x._2, consumer, security)
+ }
+ None
}
- None
+ consumer.release
+ rc
}
- consumer.release
- rc
}
}
@@ -903,20 +907,24 @@ class LocalRouter(val virtual_host:Virtu
}
def connect(destinations: Array[DestinationDTO], producer:
BindableDeliveryProducer, security: SecurityContext) = {
- producer.retain
- val paths = destinations.map(x=> (destination_parser.decode_path(x.path),
x) )
- dispatch_queue ! {
-
- val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1,
x._2, producer, security) )
- if( !failures.isEmpty ) {
- producer.release
- Some(failures.mkString("; "))
- } else {
- paths.foreach { x=>
- domain(x._2).connect(x._1, x._2, producer, security)
+ if(!virtual_host.service_state.is_started) {
+ Some("virtual host stopped.")
+ } else {
+ producer.retain
+ val paths = destinations.map(x=>
(destination_parser.decode_path(x.path), x) )
+ dispatch_queue ! {
+
+ val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1,
x._2, producer, security) )
+ if( !failures.isEmpty ) {
+ producer.release
+ Some(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).connect(x._1, x._2, producer, security)
+ }
+ producer.connected()
+ None
}
- producer.connected()
- None
}
}
}
@@ -932,28 +940,36 @@ class LocalRouter(val virtual_host:Virtu
}
def create(destinations:Array[DestinationDTO], security: SecurityContext) =
dispatch_queue ! {
- val paths = destinations.map(x=> (destination_parser.decode_path(x.path),
x) )
- val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1,
x._2, security) )
- if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
+ if(!virtual_host.service_state.is_started) {
+ Some("virtual host stopped.")
} else {
- paths.foreach { x=>
- domain(x._2).create_destination(x._1, x._2, security)
+ val paths = destinations.map(x=>
(destination_parser.decode_path(x.path), x) )
+ val failures = paths.flatMap(x=>
domain(x._2).can_create_destination(x._1, x._2, security) )
+ if( !failures.isEmpty ) {
+ Some(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).create_destination(x._1, x._2, security)
+ }
+ None
}
- None
}
}
def delete(destinations:Array[DestinationDTO], security: SecurityContext) =
dispatch_queue ! {
- val paths = destinations.map(x=> (destination_parser.decode_path(x.path),
x) )
- val failures = paths.flatMap(x=>
domain(x._2).can_destroy_destination(x._1, x._2, security) )
- if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
+ if(!virtual_host.service_state.is_started) {
+ Some("virtual host stopped.")
} else {
- paths.foreach { x=>
- domain(x._2).destroy_destination(x._1, x._2)
+ val paths = destinations.map(x=>
(destination_parser.decode_path(x.path), x) )
+ val failures = paths.flatMap(x=>
domain(x._2).can_destroy_destination(x._1, x._2, security) )
+ if( !failures.isEmpty ) {
+ Some(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).destroy_destination(x._1, x._2)
+ }
+ None
}
- None
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Mon Aug 15 06:09:16 2011
@@ -110,6 +110,10 @@ class VirtualHost(val broker: Broker, va
var connection_log:Log = _
var console_log:Log = _
+ // This gets set if client should get redirected to another address.
+ @volatile
+ var client_redirect:Option[String] = None
+
override def toString = if (config==null) "virtual-host" else "virtual-host:
"+config.id
/**
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Mon Aug 15 06:09:16 2011
@@ -400,6 +400,7 @@ object Stomp {
val SESSION = ascii("session")
val RESPONSE_ID = ascii("response-id")
val SERVER = ascii("server")
+ val REDIRECT_HEADER = ascii("redirect")
val BROWSER = ascii("browser")
val EXCLUSIVE = ascii("exclusive")
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Aug 15 06:09:16 2011
@@ -548,6 +548,12 @@ class StompProtocolHandler extends Proto
case x:Break=>
}
+ private def async_die(headers:HeaderMap, body:String) = try {
+ die(headers, body)
+ } catch {
+ case x:Break=>
+ }
+
private def die[T](msg:String, e:Throwable=null):T = {
if( e!=null) {
debug(e, "Shutting connection down due to: "+msg)
@@ -801,6 +807,11 @@ class StompProtocolHandler extends Proto
if(host==null) {
async_die("Invalid virtual host: "+host_header.get)
noop
+ } else if(!host.service_state.is_started) {
+ var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped"))
:: Nil
+ host.client_redirect.foreach(x=> headers ::=
REDIRECT_HEADER->encode_header(x) )
+ async_die(headers, "")
+ noop
} else {
this.host=host
if( host.authenticator!=null && host.authorizer!=null ) {
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Mon Aug 15 06:09:16 2011
@@ -144,6 +144,8 @@ trait BaseService extends Service with D
_service_state = new FAILED
done
}
+ case state:CREATED =>
+ done
case state:STOPPED =>
done
case state:STOPPING =>