Author: kwall
Date: Tue May 22 18:18:50 2012
New Revision: 1341584

URL: http://svn.apache.org/viewvc?rev=1341584&view=rev
Log:
QPID-4006: BDB HA. Make passivation async to avoid holding up the BDB thread. 
Introduce VirtualHost ERROR state to be used when virtual host is unable to 
activate or passivate itself completely. Change MULTISYNC mode to use 
WRITE_NO_SYNC. Stop relying on Monitor nodes to perform some tests.

Work of Robbie Gemmell <[email protected]> and myself.

squashme

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
 Tue May 22 18:18:50 2012
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
@@ -67,7 +68,7 @@ public class BDBHAMessageStore extends A
 {
     private static final String MUTLI_SYNC = "MUTLI_SYNC";
     private static final String DEFAULT_REPLICATION_POLICY =
-        MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + 
ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+        MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + 
ReplicaAckPolicy.SIMPLE_MAJORITY.name();
 
     private static final Logger LOGGER = 
Logger.getLogger(BDBHAMessageStore.class);
 
@@ -107,7 +108,7 @@ public class BDBHAMessageStore extends A
 
         if(_replicationPolicy.startsWith(MUTLI_SYNC))
         {
-            _replicationDurability = 
Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, 
SyncPolicy.SYNC.name()));
+            _replicationDurability = 
Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, 
SyncPolicy.WRITE_NO_SYNC.name()));
             _localMultiSyncCommits = true;
         }
         else
@@ -388,11 +389,11 @@ public class BDBHAMessageStore extends A
                 activateStoreAsync();
                 break;
             case REPLICA:
-                passivateStore();
+                passivateStoreAsync();
                 break;
             case DETACHED:
                 LOGGER.error("BDB replicated node in detached state, therefore 
passivating.");
-                passivateStore();
+                passivateStoreAsync();
                 break;
             case UNKNOWN:
                 LOGGER.warn("BDB replicated node in unknown state (hopefully 
temporarily)");
@@ -403,20 +404,6 @@ public class BDBHAMessageStore extends A
             }
         }
 
-        /** synchronously calls passivate. This is acceptable because {@link 
HAMessageStore#passivate()} is expected to be fast */
-        private void passivateStore()
-        {
-            try
-            {
-                passivate();
-            }
-            catch(Exception e)
-            {
-                LOGGER.error("Unable to passivate", e);
-                throw new RuntimeException("Unable to passivate", e);
-            }
-        }
-
         /**
          * Calls {@link MessageStore#activate()}.
          *
@@ -429,34 +416,93 @@ public class BDBHAMessageStore extends A
          */
         private void activateStoreAsync()
         {
+            String threadName = "BDBHANodeActivationThread-" + _name;
+            executeStateChangeAsync(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    try
+                    {
+                        activate();
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Failed to activate on hearing MASTER 
change event",e);
+                        throw e;
+                    }
+                    return null;
+                }
+            }, threadName);
+        }
+
+        /**
+         * Calls {@link #passivate()}.
+         *
+         * <p/>
+         * This is done a background thread, in line with
+         * {@link StateChangeListener#stateChange(StateChangeEvent)}'s 
JavaDoc, because
+         * passivation due to the effect of state change listeners.
+         */
+        private void passivateStoreAsync()
+        {
+            String threadName = "BDBHANodePassivationThread-" + _name;
+            executeStateChangeAsync(new Callable<Void>()
+            {
+
+                @Override
+                public Void call() throws Exception
+                {
+                    try
+                    {
+                        passivate();
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Failed to passivate on hearing REPLICA 
or DETACHED change event",e);
+                        throw e;
+                    }
+                    return null;
+                }
+            }, threadName);
+        }
+
+        private void executeStateChangeAsync(final Callable<Void> callable, 
final String threadName)
+        {
             final RootMessageLogger _rootLogger = 
CurrentActor.get().getRootMessageLogger();
 
             _executor.execute(new Runnable()
             {
-                private static final String _THREAD_NAME = 
"BDBHANodeActivationThread";
 
                 @Override
                 public void run()
                 {
-                    Thread.currentThread().setName(_THREAD_NAME);
-                    CurrentActor.set(new AbstractActor(_rootLogger)
+                    final String originalThreadName = 
Thread.currentThread().getName();
+                    Thread.currentThread().setName(threadName);
+                    try
                     {
-                        @Override
-                        public String getLogMessage()
+                        CurrentActor.set(new AbstractActor(_rootLogger)
                         {
-                            return _THREAD_NAME;
-                        }
-                    });
+                            @Override
+                            public String getLogMessage()
+                            {
+                                return threadName;
+                            }
+                        });
 
-                    try
-                    {
-                        activate();
+                        try
+                        {
+                            callable.call();
+                        }
+                        catch (Exception e)
+                        {
+                            LOGGER.error("Exception during state change", e);
+                        }
                     }
-                    catch (Exception e)
+                    finally
                     {
-                        LOGGER.error("Failed to activate on hearing MASTER 
change event",e);
+                        Thread.currentThread().setName(originalThreadName);
                     }
-
                 }
             });
         }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
 Tue May 22 18:18:50 2012
@@ -76,7 +76,7 @@ public class HAClusterBlackboxTest exten
         // Don't start default broker provided by QBTC.
     }
 
-    public void testLossOfActiveNodeCausesClientToFailover() throws Exception
+    public void testLossOfMasterNodeCausesClientToFailover() throws Exception
     {
         final Connection connection = getConnection(_brokerFailoverUrl);
 
@@ -93,10 +93,11 @@ public class HAClusterBlackboxTest exten
         connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
-    public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws 
Exception
+    public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws 
Exception
     {
         LOGGER.info("Connecting to " + _brokerFailoverUrl);
         final Connection connection = getConnection(_brokerFailoverUrl);
+        LOGGER.info("Got connection to cluster");
 
         
((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
         final int activeBrokerPort = 
_clusterCreator.getBrokerPortNumberFromConnection(connection);

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
 Tue May 22 18:18:50 2012
@@ -22,8 +22,6 @@ package org.apache.qpid.server.store.ber
 import java.io.File;
 import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -53,8 +51,6 @@ public class HAClusterWhiteboxTest exten
     private final int NUMBER_OF_NODES = 3;
     private final HATestClusterCreator _clusterCreator = new 
HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
 
-    // TODO Factory refactoring?? // MessageStore construction??
-
     @Override
     protected void setUp() throws Exception
     {
@@ -86,7 +82,7 @@ public class HAClusterWhiteboxTest exten
         {
             try
             {
-                
getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+                
getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber));
                 connectionSuccesses++;
             }
             catch(JMSException e)
@@ -105,7 +101,7 @@ public class HAClusterWhiteboxTest exten
         final Connection initialConnection = getConnectionToNodeInCluster();
         assertNotNull(initialConnection);
 
-        killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+        killConnectionBroker(initialConnection);
 
         final Connection subsequentConnection = getConnectionToNodeInCluster();
         assertNotNull(subsequentConnection);
@@ -121,7 +117,7 @@ public class HAClusterWhiteboxTest exten
         final Connection initialConnection = getConnectionToNodeInCluster();
         assertNotNull(initialConnection);
 
-        killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+        killConnectionBroker(initialConnection);
 
         final Connection subsequentConnection = getConnectionToNodeInCluster();
         assertNotNull(subsequentConnection);
@@ -159,7 +155,7 @@ public class HAClusterWhiteboxTest exten
 
         populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, 
customExchangeQueueUrl);
 
-        killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+        killConnectionBroker(initialConnection);
 
         final Connection subsequentConnection = getConnectionToNodeInCluster();
 
@@ -168,7 +164,7 @@ public class HAClusterWhiteboxTest exten
         checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, 
customExchangeQueueUrl);
     }
 
-    public void testRecoveryOfOutOfDateNode() throws Exception
+    public void xtestRecoveryOfOutOfDateNode() throws Exception
     {
         /*
          * TODO: Implement
@@ -220,7 +216,7 @@ public class HAClusterWhiteboxTest exten
         {
             try
             {
-                connection = 
getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+                connection = 
getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber));
                 break;
             }
             catch(JMSException je)
@@ -231,26 +227,11 @@ public class HAClusterWhiteboxTest exten
         return connection;
     }
 
-    private void killConnectionBrokerAndWaitForNewMasterElection(final 
Connection initialConnection) throws IOException,
+    private void killConnectionBroker(final Connection initialConnection) 
throws IOException,
             InterruptedException
     {
-        try
-        {
-            // NewMasterEvent is received twice: first for the existing master,
-            // second for a new master
-            CountDownLatch newMasterLatch = new CountDownLatch(2);
-            _clusterCreator.startMonitorNode();
-            _clusterCreator.statListeningForNewMasterEvent(newMasterLatch);
-
-            final int initialPortNumber = 
_clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
-            killBroker(initialPortNumber);
-
-            assertTrue("New master was not elected", newMasterLatch.await(30, 
TimeUnit.SECONDS));
-        }
-        finally
-        {
-            _clusterCreator.shutdownMonitor();
-        }
+        final int initialPortNumber = 
_clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+        killBroker(initialPortNumber); // kill awaits the death of the child
     }
 
     private void assertProducingConsuming(final Connection connection) throws 
JMSException, Exception

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
 Tue May 22 18:18:50 2012
@@ -19,7 +19,6 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
@@ -32,7 +31,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -48,26 +46,22 @@ import org.apache.qpid.client.AMQConnect
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.url.URLSyntaxException;
 
-import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.monitor.GroupChangeEvent;
-import com.sleepycat.je.rep.monitor.JoinGroupEvent;
-import com.sleepycat.je.rep.monitor.LeaveGroupEvent;
-import com.sleepycat.je.rep.monitor.Monitor;
-import com.sleepycat.je.rep.monitor.MonitorChangeListener;
-import com.sleepycat.je.rep.monitor.MonitorConfig;
-import com.sleepycat.je.rep.monitor.NewMasterEvent;
-
 public class HATestClusterCreator
 {
     protected static final Logger LOGGER = 
Logger.getLogger(HATestClusterCreator.class);
 
     private static final String MANY_BROKER_URL_FORMAT = 
"amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
     private static final String BROKER_PORTION_FORMAT = 
"tcp://localhost:%d?connectdelay='%d',retries='%d'";
-    private static final String SINGLE_BROKER_URL_FORMAT = 
"amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
 
-    private static final int CYCLECOUNT = 2;
-    private static final int RETRIES = 2;
-    private static final int CONNECTDELAY = 1000;
+    private static final int FAILOVER_CYCLECOUNT = 2;
+    private static final int FAILOVER_RETRIES = 2;
+    private static final int FAILOVER_CONNECTDELAY = 1000;
+
+    private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = 
"amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+    private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = 
"amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
+
+    private static final int RETRIES = 30;
+    private static final int CONNECTDELAY = 75;
 
     private final QpidBrokerTestCase _testcase;
     private final Map<Integer, Integer> _brokerPortToBdbPortMap = new 
HashMap<Integer, Integer>();
@@ -80,7 +74,6 @@ public class HATestClusterCreator
     private final int _numberOfNodes;
     private int _bdbHelperPort;
     private int _primaryBrokerPort;
-    private Monitor _monitor;
 
     public HATestClusterCreator(QpidBrokerTestCase testcase, String 
virtualHostName, int numberOfNodes)
     {
@@ -216,7 +209,6 @@ public class HATestClusterCreator
 
     public void stopCluster() throws Exception
     {
-        shutdownMonitor();
         for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
         {
             try
@@ -265,19 +257,38 @@ public class HATestClusterCreator
         {
             int brokerPortNumber = itr.next();
 
-            brokerList.append(String.format(BROKER_PORTION_FORMAT, 
brokerPortNumber, CONNECTDELAY, RETRIES));
+            brokerList.append(String.format(BROKER_PORTION_FORMAT, 
brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES));
             if (itr.hasNext())
             {
                 brokerList.append(";");
             }
         }
 
-        return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, 
_virtualHostName, brokerList, CYCLECOUNT));
+        return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, 
_virtualHostName, brokerList, FAILOVER_CYCLECOUNT));
+    }
+
+    public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final 
int brokerPortNumber) throws URLSyntaxException
+    {
+        return getConnectionUrlForSingleNode(brokerPortNumber, false);
+    }
+
+    public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int 
brokerPortNumber) throws URLSyntaxException
+    {
+        return getConnectionUrlForSingleNode(brokerPortNumber, true);
     }
 
-    public AMQConnectionURL getConnectionUrlForSingleNode(final int 
brokerPortNumber) throws URLSyntaxException
+    private AMQConnectionURL getConnectionUrlForSingleNode(final int 
brokerPortNumber, boolean retryAllowed) throws URLSyntaxException
     {
-        String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, 
brokerPortNumber, CONNECTDELAY, RETRIES);
+        final String url;
+        if (retryAllowed)
+        {
+            url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, 
_virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+        }
+        else
+        {
+            url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, 
_virtualHostName, brokerPortNumber);
+        }
+
         return new AMQConnectionURL(url);
     }
 
@@ -343,7 +354,6 @@ public class HATestClusterCreator
         _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.nodeName", nodeName);
         _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
         _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.helperHostPort", getHelperHostPort());
-        // TODO replication policy
     }
 
     public String getIpAddressOfBrokerHost()
@@ -403,77 +413,4 @@ public class HATestClusterCreator
         virtualHostConfig.setProperty(configKey, newBdbHostPort);
         collectConfig(brokerPortNumberToBeMoved, 
brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
     }
-
-    public void startMonitorNode()
-    {
-        shutdownMonitor();
-
-        MonitorConfig config = new MonitorConfig();
-        config.setGroupName(_groupName);
-        int monitorPort = _testcase.findFreePort();
-        config.setNodeName(getNodeNameForNodeAt(monitorPort));
-        config.setNodeHostPort("" + monitorPort);
-        config.setHelperHosts(getHelperHostPort());
-
-        _monitor = new Monitor(config);
-
-        ReplicationNode currentMaster = _monitor.register();
-        LOGGER.info("Current master " + currentMaster.getName());
-    }
-
-    public void startListening(MonitorChangeListener listener) throws 
IOException
-    {
-        _monitor.startListener(listener);
-    }
-
-    public void statListeningForNewMasterEvent(final CountDownLatch latch) 
throws IOException
-    {
-        startListening(new MonitorChangeListenerSupport(){
-            @Override
-            public void notify(NewMasterEvent newMasterEvent)
-            {
-                LOGGER.debug("New master is elected " + 
newMasterEvent.getMasterName());
-                latch.countDown();
-            }
-        });
-    }
-
-    public void shutdownMonitor()
-    {
-        if (_monitor != null)
-        {
-            try
-            {
-                _monitor.shutdown();
-            }
-            catch (Exception e)
-            {
-                LOGGER.warn("Monitor shutdown error:", e);
-            }
-        }
-    }
-
-    public static class MonitorChangeListenerSupport implements 
MonitorChangeListener
-    {
-
-        @Override
-        public void notify(NewMasterEvent newMasterEvent)
-        {
-        }
-
-        @Override
-        public void notify(GroupChangeEvent groupChangeEvent)
-        {
-        }
-
-        @Override
-        public void notify(JoinGroupEvent joinGroupEvent)
-        {
-        }
-
-        @Override
-        public void notify(LeaveGroupEvent leaveGroupEvent)
-        {
-        }
-    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
 Tue May 22 18:18:50 2012
@@ -23,4 +23,6 @@ CREATED = VHT-1001 : Created : {0}
 CLOSED = VHT-1002 : Closed
 
 STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : 
{2,number,#.###} kB/s peak : {3,number,#} bytes total
-STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : 
{2,number,#.###} msg/s peak : {3,number,#} msgs total`
\ No newline at end of file
+STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : 
{2,number,#.###} msg/s peak : {3,number,#} msgs total
+
+ERRORED = VHT-1005 : Unexpected fatal error
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
 Tue May 22 18:18:50 2012
@@ -25,5 +25,7 @@ public enum State
     INITIALISING,
     ACTIVE,
     PASSIVE,
-    STOPPED
+    STOPPED,
+    /** Terminal state that signifies the virtual host has experienced an 
unexpected condition. */
+    ERRORED
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1341584&r1=1341583&r2=1341584&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
 Tue May 22 18:18:50 2012
@@ -122,7 +122,7 @@ public class VirtualHostImpl implements 
 
     private final MessageStore _messageStore;
 
-    private State _state = State.INITIALISING;
+    private volatile State _state = State.INITIALISING;
 
     private boolean _statisticsEnabled = false;
 
@@ -824,17 +824,25 @@ public class VirtualHostImpl implements 
        @Override
        public void event(Event event)
        {
-           initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+           State finalState = State.ERRORED;
            try
            {
-               _brokerMBean.register();
+               
initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+               try
+               {
+                   _brokerMBean.register();
+               }
+               catch (JMException e)
+               {
+                   throw new RuntimeException("Failed to register virtual host 
mbean for virtual host " + getName(), e);
+               }
+               finalState = State.ACTIVE;
            }
-           catch (JMException e)
+           finally
            {
-               throw new RuntimeException("Failed to register virtual host 
mbean for virtual host " + getName(), e);
+               _state = finalState;
+               reportIfError(_state);
            }
-
-           _state = State.ACTIVE;
        }
    }
 
@@ -842,16 +850,33 @@ public class VirtualHostImpl implements 
     {
         public void event(Event event)
         {
-            
_connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
-            _brokerMBean.unregister();
-            removeHouseKeepingTasks();
+            State finalState = State.ERRORED;
 
-            _queueRegistry.stopAllAndUnregisterMBeans();
-            _exchangeRegistry.clearAndUnregisterMbeans();
-            _dtxRegistry.close();
+            try
+            {
+                /* the approach here is not ideal as there is a race condition 
where a
+                 * queue etc could be created while the virtual host is on the 
way to
+                 * the passivated state.  However the store state change from 
MASTER to UNKNOWN
+                 * is documented as exceptionally rare..
+                 */
+
+                
_connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+                _brokerMBean.unregister();
+                removeHouseKeepingTasks();
+
+                _queueRegistry.stopAllAndUnregisterMBeans();
+                _exchangeRegistry.clearAndUnregisterMbeans();
+                _dtxRegistry.close();
 
-            _state = State.PASSIVE;
+                finalState = State.PASSIVE;
+            }
+            finally
+            {
+                _state = finalState;
+                reportIfError(_state);
+            }
         }
+
     }
 
     private final class BeforeCloseListener implements EventListener
@@ -864,6 +889,14 @@ public class VirtualHostImpl implements 
         }
     }
 
+    private void reportIfError(State state)
+    {
+        if (state == State.ERRORED)
+        {
+            CurrentActor.get().message(VirtualHostMessages.ERRORED());
+        }
+    }
+
     private class VirtualHostHouseKeepingTask extends HouseKeepingTask
     {
         public VirtualHostHouseKeepingTask()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to