Author: kwall
Date: Tue Oct 28 14:16:18 2014
New Revision: 1634884

URL: http://svn.apache.org/r1634884
Log:
QPID-6192: [Java Broker] On close, close the connections before exchanges/queues

* Exchanges/Queue now check virtualhost state prior to routing a message to 
queue/consumer.

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 Tue Oct 28 14:16:18 2014
@@ -507,8 +507,13 @@ public abstract class AbstractExchange<T
                                                                                
         final ServerTransaction txn,
                                                                                
         final Action<? super MessageInstance> postEnqueueAction)
     {
-        List<? extends BaseQueue> queues = route(message, routingAddress, 
instanceProperties);
+        if (_virtualHost.getState() != State.ACTIVE)
+        {
+            _logger.debug("Virtualhost state " + _virtualHost.getState() + " 
prevents the message from being sent");
+            return 0;
+        }
 
+        List<? extends BaseQueue> queues = route(message, routingAddress, 
instanceProperties);
         if(queues == null || queues.isEmpty())
         {
             Exchange altExchange = getAlternateExchange();

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
 Tue Oct 28 14:16:18 2014
@@ -490,6 +490,7 @@ public abstract class AbstractConfigured
     {
         if(_dynamicState.compareAndSet(DynamicState.OPENED, 
DynamicState.CLOSED))
         {
+            beforeClose();
             closeChildren();
             onClose();
             unregister(false);
@@ -497,6 +498,10 @@ public abstract class AbstractConfigured
         }
     }
 
+    protected void beforeClose()
+    {
+    }
+
     protected void onClose()
     {
     }

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
 Tue Oct 28 14:16:18 2014
@@ -78,9 +78,9 @@ public final class BrokerModel extends M
         addRelationship(VirtualHostNode.class, VirtualHost.class);
         addRelationship(VirtualHostNode.class, RemoteReplicationNode.class);
 
+        addRelationship(VirtualHost.class, Connection.class);
         addRelationship(VirtualHost.class, Exchange.class);
         addRelationship(VirtualHost.class, Queue.class);
-        addRelationship(VirtualHost.class, Connection.class);
 
         addRelationship(Port.class, VirtualHostAlias.class);
 

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Tue Oct 28 14:16:18 2014
@@ -1923,6 +1923,12 @@ public abstract class AbstractQueue<X ex
                         sub.releaseSendLock();
                     }
                 }
+
+                if (_virtualHost.getState() != State.ACTIVE)
+                {
+                    _logger.debug("Subscription flush halted owing to 
virtualhost state " + _virtualHost.getState());
+                    return true;
+                }
             }
         }
         finally
@@ -1967,12 +1973,13 @@ public abstract class AbstractQueue<X ex
         boolean atTail = false;
 
         boolean subActive = sub.isActive() && !sub.isSuspended();
+
         if (subActive)
         {
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (node != null && node.isAvailable())
+            if (_virtualHost.getState() == State.ACTIVE && node != null && 
node.isAvailable())
             {
                 if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
@@ -2178,6 +2185,12 @@ public abstract class AbstractQueue<X ex
                                     sub.flushBatched();
                                     break;
                                 }
+                                if (_virtualHost.getState() != State.ACTIVE)
+                                {
+                                    _logger.debug("Queue process halted owing 
to virtualhost state " + _virtualHost.getState());
+
+                                    break;
+                                }
                             }
 
                         }

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Tue Oct 28 14:16:18 2014
@@ -664,9 +664,15 @@ public abstract class AbstractVirtualHos
         return _broker.getSecurityManager();
     }
 
-    protected void onClose()
+    @Override
+    protected void beforeClose()
     {
         setState(State.UNAVAILABLE);
+    }
+
+    @Override
+    protected void onClose()
+    {
         //Stop Connections
         _connectionRegistry.close();
         _dtxRegistry.close();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to