Author: chirino
Date: Mon Aug 15 06:09:16 2011
New Revision: 1157716

URL: http://svn.apache.org/viewvc?rev=1157716&view=rev
Log:
Better deal with the case where a connection tries to use a virtual host which 
is stopped.

Modified:
    
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala

Modified: 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
 Mon Aug 15 06:09:16 2011
@@ -875,20 +875,24 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def bind(destination: Array[DestinationDTO], consumer: DeliveryConsumer, 
security: SecurityContext) = {
-    consumer.retain
-    val paths = destination.map(x=> (destination_parser.decode_path(x.path), 
x) )
-    dispatch_queue ! {
-      val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2, 
consumer, security) )
-      val rc = if( !failures.isEmpty ) {
-        Some(failures.mkString("; "))
-      } else {
-        paths.foreach { x=>
-          domain(x._2).bind(x._1, x._2, consumer, security)
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
+    } else {
+      consumer.retain
+      val paths = destination.map(x=> (destination_parser.decode_path(x.path), 
x) )
+      dispatch_queue ! {
+        val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2, 
consumer, security) )
+        val rc = if( !failures.isEmpty ) {
+          Some(failures.mkString("; "))
+        } else {
+          paths.foreach { x=>
+            domain(x._2).bind(x._1, x._2, consumer, security)
+          }
+          None
         }
-        None
+        consumer.release
+        rc
       }
-      consumer.release
-      rc
     }
   }
 
@@ -903,20 +907,24 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def connect(destinations: Array[DestinationDTO], producer: 
BindableDeliveryProducer, security: SecurityContext) = {
-    producer.retain
-    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), 
x) )
-    dispatch_queue ! {
-
-      val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1, 
x._2, producer, security) )
-      if( !failures.isEmpty ) {
-        producer.release
-        Some(failures.mkString("; "))
-      } else {
-        paths.foreach { x=>
-          domain(x._2).connect(x._1, x._2, producer, security)
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
+    } else {
+      producer.retain
+      val paths = destinations.map(x=> 
(destination_parser.decode_path(x.path), x) )
+      dispatch_queue ! {
+
+        val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1, 
x._2, producer, security) )
+        if( !failures.isEmpty ) {
+          producer.release
+          Some(failures.mkString("; "))
+        } else {
+          paths.foreach { x=>
+            domain(x._2).connect(x._1, x._2, producer, security)
+          }
+          producer.connected()
+          None
         }
-        producer.connected()
-        None
       }
     }
   }
@@ -932,28 +940,36 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def create(destinations:Array[DestinationDTO], security: SecurityContext) = 
dispatch_queue ! {
-    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), 
x) )
-    val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1, 
x._2, security) )
-    if( !failures.isEmpty ) {
-      Some(failures.mkString("; "))
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
     } else {
-      paths.foreach { x=>
-        domain(x._2).create_destination(x._1, x._2, security)
+      val paths = destinations.map(x=> 
(destination_parser.decode_path(x.path), x) )
+      val failures = paths.flatMap(x=> 
domain(x._2).can_create_destination(x._1, x._2, security) )
+      if( !failures.isEmpty ) {
+        Some(failures.mkString("; "))
+      } else {
+        paths.foreach { x=>
+          domain(x._2).create_destination(x._1, x._2, security)
+        }
+        None
       }
-      None
     }
   }
 
   def delete(destinations:Array[DestinationDTO], security: SecurityContext) = 
dispatch_queue ! {
-    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), 
x) )
-    val failures = paths.flatMap(x=> 
domain(x._2).can_destroy_destination(x._1, x._2, security) )
-    if( !failures.isEmpty ) {
-      Some(failures.mkString("; "))
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
     } else {
-      paths.foreach { x=>
-        domain(x._2).destroy_destination(x._1, x._2)
+      val paths = destinations.map(x=> 
(destination_parser.decode_path(x.path), x) )
+      val failures = paths.flatMap(x=> 
domain(x._2).can_destroy_destination(x._1, x._2, security) )
+      if( !failures.isEmpty ) {
+        Some(failures.mkString("; "))
+      } else {
+        paths.foreach { x=>
+          domain(x._2).destroy_destination(x._1, x._2)
+        }
+        None
       }
-      None
     }
   }
 

Modified: 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
 Mon Aug 15 06:09:16 2011
@@ -110,6 +110,10 @@ class VirtualHost(val broker: Broker, va
   var connection_log:Log = _
   var console_log:Log = _
 
+  // This gets set if client should get redirected to another address.
+  @volatile
+  var client_redirect:Option[String] = None
+
   override def toString = if (config==null) "virtual-host" else "virtual-host: 
"+config.id
 
   /**

Modified: 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
 Mon Aug 15 06:09:16 2011
@@ -400,6 +400,7 @@ object Stomp {
   val SESSION = ascii("session")
   val RESPONSE_ID = ascii("response-id")
   val SERVER = ascii("server")
+  val REDIRECT_HEADER = ascii("redirect")
 
   val BROWSER = ascii("browser")
   val EXCLUSIVE = ascii("exclusive")

Modified: 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
 Mon Aug 15 06:09:16 2011
@@ -548,6 +548,12 @@ class StompProtocolHandler extends Proto
     case x:Break=>
   }
 
+  private def async_die(headers:HeaderMap, body:String) = try {
+    die(headers, body)
+  } catch {
+    case x:Break=>
+  }
+
   private def die[T](msg:String, e:Throwable=null):T = {
     if( e!=null) {
       debug(e, "Shutting connection down due to: "+msg)
@@ -801,6 +807,11 @@ class StompProtocolHandler extends Proto
       if(host==null) {
         async_die("Invalid virtual host: "+host_header.get)
         noop
+      } else if(!host.service_state.is_started) {
+        var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped")) 
:: Nil
+        host.client_redirect.foreach(x=> headers ::= 
REDIRECT_HEADER->encode_header(x) )
+        async_die(headers, "")
+        noop
       } else {
         this.host=host
         if( host.authenticator!=null &&  host.authorizer!=null ) {

Modified: 
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
 Mon Aug 15 06:09:16 2011
@@ -144,6 +144,8 @@ trait BaseService extends Service with D
               _service_state = new FAILED
               done
           }
+        case state:CREATED =>
+          done
         case state:STOPPED =>
           done
         case state:STOPPING =>


Reply via email to