Author: kwall Date: Tue May 22 18:18:50 2012 New Revision: 1341584 URL: http://svn.apache.org/viewvc?rev=1341584&view=rev Log: QPID-4006: BDB HA. Make passivation async to avoid holding up the BDB thread. Introduce VirtualHost ERROR state to be used when virtual host is unable to activate or passivate itself completely. Change MULTISYNC mode to use WRITE_NO_SYNC. Stop relying on Monitor nodes to perform some tests.
Work of Robbie Gemmell <[email protected]> and myself. squashme Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java Tue May 22 18:18:50 2012 @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -67,7 +68,7 @@ public class BDBHAMessageStore extends A { private static final String MUTLI_SYNC = "MUTLI_SYNC"; private static final String DEFAULT_REPLICATION_POLICY = - MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); @@ -107,7 +108,7 @@ public class BDBHAMessageStore extends A if(_replicationPolicy.startsWith(MUTLI_SYNC)) { - _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name())); + _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name())); _localMultiSyncCommits = true; } else @@ -388,11 +389,11 @@ public class BDBHAMessageStore extends A activateStoreAsync(); break; case REPLICA: - passivateStore(); + passivateStoreAsync(); break; case DETACHED: LOGGER.error("BDB replicated node in detached state, therefore passivating."); - passivateStore(); + passivateStoreAsync(); break; case UNKNOWN: LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); @@ -403,20 +404,6 @@ public class BDBHAMessageStore extends A } } - /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */ - private void passivateStore() - { - try - { - passivate(); - } - catch(Exception e) - { - LOGGER.error("Unable to passivate", e); - throw new RuntimeException("Unable to passivate", e); - } - } - /** * Calls {@link MessageStore#activate()}. * @@ -429,34 +416,93 @@ public class BDBHAMessageStore extends A */ private void activateStoreAsync() { + String threadName = "BDBHANodeActivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + throw e; + } + return null; + } + }, threadName); + } + + /** + * Calls {@link #passivate()}. + * + * <p/> + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * passivation due to the effect of state change listeners. + */ + private void passivateStoreAsync() + { + String threadName = "BDBHANodePassivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + + @Override + public Void call() throws Exception + { + try + { + passivate(); + } + catch (Exception e) + { + LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e); + throw e; + } + return null; + } + }, threadName); + } + + private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) + { final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); _executor.execute(new Runnable() { - private static final String _THREAD_NAME = "BDBHANodeActivationThread"; @Override public void run() { - Thread.currentThread().setName(_THREAD_NAME); - CurrentActor.set(new AbstractActor(_rootLogger) + final String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(threadName); + try { - @Override - public String getLogMessage() + CurrentActor.set(new AbstractActor(_rootLogger) { - return _THREAD_NAME; - } - }); + @Override + public String getLogMessage() + { + return threadName; + } + }); - try - { - activate(); + try + { + callable.call(); + } + catch (Exception e) + { + LOGGER.error("Exception during state change", e); + } } - catch (Exception e) + finally { - LOGGER.error("Failed to activate on hearing MASTER change event",e); + Thread.currentThread().setName(originalThreadName); } - } }); } Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java Tue May 22 18:18:50 2012 @@ -76,7 +76,7 @@ public class HAClusterBlackboxTest exten // Don't start default broker provided by QBTC. } - public void testLossOfActiveNodeCausesClientToFailover() throws Exception + public void testLossOfMasterNodeCausesClientToFailover() throws Exception { final Connection connection = getConnection(_brokerFailoverUrl); @@ -93,10 +93,11 @@ public class HAClusterBlackboxTest exten connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception { LOGGER.info("Connecting to " + _brokerFailoverUrl); final Connection connection = getConnection(_brokerFailoverUrl); + LOGGER.info("Got connection to cluster"); ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java Tue May 22 18:18:50 2012 @@ -22,8 +22,6 @@ package org.apache.qpid.server.store.ber import java.io.File; import java.io.IOException; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -53,8 +51,6 @@ public class HAClusterWhiteboxTest exten private final int NUMBER_OF_NODES = 3; private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - // TODO Factory refactoring?? // MessageStore construction?? - @Override protected void setUp() throws Exception { @@ -86,7 +82,7 @@ public class HAClusterWhiteboxTest exten { try { - getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); connectionSuccesses++; } catch(JMSException e) @@ -105,7 +101,7 @@ public class HAClusterWhiteboxTest exten final Connection initialConnection = getConnectionToNodeInCluster(); assertNotNull(initialConnection); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); assertNotNull(subsequentConnection); @@ -121,7 +117,7 @@ public class HAClusterWhiteboxTest exten final Connection initialConnection = getConnectionToNodeInCluster(); assertNotNull(initialConnection); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); assertNotNull(subsequentConnection); @@ -159,7 +155,7 @@ public class HAClusterWhiteboxTest exten populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); @@ -168,7 +164,7 @@ public class HAClusterWhiteboxTest exten checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); } - public void testRecoveryOfOutOfDateNode() throws Exception + public void xtestRecoveryOfOutOfDateNode() throws Exception { /* * TODO: Implement @@ -220,7 +216,7 @@ public class HAClusterWhiteboxTest exten { try { - connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); break; } catch(JMSException je) @@ -231,26 +227,11 @@ public class HAClusterWhiteboxTest exten return connection; } - private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException, + private void killConnectionBroker(final Connection initialConnection) throws IOException, InterruptedException { - try - { - // NewMasterEvent is received twice: first for the existing master, - // second for a new master - CountDownLatch newMasterLatch = new CountDownLatch(2); - _clusterCreator.startMonitorNode(); - _clusterCreator.statListeningForNewMasterEvent(newMasterLatch); - - final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - killBroker(initialPortNumber); - - assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS)); - } - finally - { - _clusterCreator.shutdownMonitor(); - } + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + killBroker(initialPortNumber); // kill awaits the death of the child } private void assertProducingConsuming(final Connection connection) throws JMSException, Exception Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java Tue May 22 18:18:50 2012 @@ -19,7 +19,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; @@ -32,7 +31,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,26 +46,22 @@ import org.apache.qpid.client.AMQConnect import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.URLSyntaxException; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.monitor.GroupChangeEvent; -import com.sleepycat.je.rep.monitor.JoinGroupEvent; -import com.sleepycat.je.rep.monitor.LeaveGroupEvent; -import com.sleepycat.je.rep.monitor.Monitor; -import com.sleepycat.je.rep.monitor.MonitorChangeListener; -import com.sleepycat.je.rep.monitor.MonitorConfig; -import com.sleepycat.je.rep.monitor.NewMasterEvent; - public class HATestClusterCreator { protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; - private static final int CYCLECOUNT = 2; - private static final int RETRIES = 2; - private static final int CONNECTDELAY = 1000; + private static final int FAILOVER_CYCLECOUNT = 2; + private static final int FAILOVER_RETRIES = 2; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 30; + private static final int CONNECTDELAY = 75; private final QpidBrokerTestCase _testcase; private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); @@ -80,7 +74,6 @@ public class HATestClusterCreator private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; - private Monitor _monitor; public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { @@ -216,7 +209,6 @@ public class HATestClusterCreator public void stopCluster() throws Exception { - shutdownMonitor(); for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) { try @@ -265,19 +257,38 @@ public class HATestClusterCreator { int brokerPortNumber = itr.next(); - brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES)); + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); if (itr.hasNext()) { brokerList.append(";"); } } - return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT)); + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); } - public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException { - String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + return new AMQConnectionURL(url); } @@ -343,7 +354,6 @@ public class HATestClusterCreator _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName); _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); - // TODO replication policy } public String getIpAddressOfBrokerHost() @@ -403,77 +413,4 @@ public class HATestClusterCreator virtualHostConfig.setProperty(configKey, newBdbHostPort); collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); } - - public void startMonitorNode() - { - shutdownMonitor(); - - MonitorConfig config = new MonitorConfig(); - config.setGroupName(_groupName); - int monitorPort = _testcase.findFreePort(); - config.setNodeName(getNodeNameForNodeAt(monitorPort)); - config.setNodeHostPort("" + monitorPort); - config.setHelperHosts(getHelperHostPort()); - - _monitor = new Monitor(config); - - ReplicationNode currentMaster = _monitor.register(); - LOGGER.info("Current master " + currentMaster.getName()); - } - - public void startListening(MonitorChangeListener listener) throws IOException - { - _monitor.startListener(listener); - } - - public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException - { - startListening(new MonitorChangeListenerSupport(){ - @Override - public void notify(NewMasterEvent newMasterEvent) - { - LOGGER.debug("New master is elected " + newMasterEvent.getMasterName()); - latch.countDown(); - } - }); - } - - public void shutdownMonitor() - { - if (_monitor != null) - { - try - { - _monitor.shutdown(); - } - catch (Exception e) - { - LOGGER.warn("Monitor shutdown error:", e); - } - } - } - - public static class MonitorChangeListenerSupport implements MonitorChangeListener - { - - @Override - public void notify(NewMasterEvent newMasterEvent) - { - } - - @Override - public void notify(GroupChangeEvent groupChangeEvent) - { - } - - @Override - public void notify(JoinGroupEvent joinGroupEvent) - { - } - - @Override - public void notify(LeaveGroupEvent leaveGroupEvent) - { - } - } } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties Tue May 22 18:18:50 2012 @@ -23,4 +23,6 @@ CREATED = VHT-1001 : Created : {0} CLOSED = VHT-1002 : Closed STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total -STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total` \ No newline at end of file +STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total + +ERRORED = VHT-1005 : Unexpected fatal error \ No newline at end of file Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java Tue May 22 18:18:50 2012 @@ -25,5 +25,7 @@ public enum State INITIALISING, ACTIVE, PASSIVE, - STOPPED + STOPPED, + /** Terminal state that signifies the virtual host has experienced an unexpected condition. */ + ERRORED } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1341584&r1=1341583&r2=1341584&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue May 22 18:18:50 2012 @@ -122,7 +122,7 @@ public class VirtualHostImpl implements private final MessageStore _messageStore; - private State _state = State.INITIALISING; + private volatile State _state = State.INITIALISING; private boolean _statisticsEnabled = false; @@ -824,17 +824,25 @@ public class VirtualHostImpl implements @Override public void event(Event event) { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + State finalState = State.ERRORED; try { - _brokerMBean.register(); + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + try + { + _brokerMBean.register(); + } + catch (JMException e) + { + throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + } + finalState = State.ACTIVE; } - catch (JMException e) + finally { - throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + _state = finalState; + reportIfError(_state); } - - _state = State.ACTIVE; } } @@ -842,16 +850,33 @@ public class VirtualHostImpl implements { public void event(Event event) { - _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - _brokerMBean.unregister(); - removeHouseKeepingTasks(); + State finalState = State.ERRORED; - _queueRegistry.stopAllAndUnregisterMBeans(); - _exchangeRegistry.clearAndUnregisterMbeans(); - _dtxRegistry.close(); + try + { + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare.. + */ + + _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + _brokerMBean.unregister(); + removeHouseKeepingTasks(); + + _queueRegistry.stopAllAndUnregisterMBeans(); + _exchangeRegistry.clearAndUnregisterMbeans(); + _dtxRegistry.close(); - _state = State.PASSIVE; + finalState = State.PASSIVE; + } + finally + { + _state = finalState; + reportIfError(_state); + } } + } private final class BeforeCloseListener implements EventListener @@ -864,6 +889,14 @@ public class VirtualHostImpl implements } } + private void reportIfError(State state) + { + if (state == State.ERRORED) + { + CurrentActor.get().message(VirtualHostMessages.ERRORED()); + } + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
