Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1514360&r1=1514359&r2=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Thu Aug 15 16:42:39 2013 @@ -23,6 +23,7 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; @@ -68,7 +69,7 @@ public class StandardVirtualHost extends final MessageStoreLogSubject - storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); + storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName()); OperationalLoggingListener.listen(messageStore, storeLogSubject); return messageStore; @@ -96,10 +97,12 @@ public class StandardVirtualHost extends _durableConfigurationStore = initialiseConfigurationStore(virtualHost); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); - - _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost); + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(this, getExchangeRegistry())); + _durableConfigurationStore.configureConfigStore(getName(), configRecoverer, virtualHost); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler); initialiseModel(hostConfig);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1514360&r1=1514359&r2=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Thu Aug 15 16:42:39 2013 @@ -20,11 +20,7 @@ */ package org.apache.qpid.server.virtualhost; -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -32,28 +28,16 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.FilterSupport; -import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; @@ -65,11 +49,8 @@ import org.apache.qpid.server.txn.DtxReg import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.transport.Xid; import org.apache.qpid.transport.util.Functions; -import org.apache.qpid.util.ByteBufferInputStream; -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; - -public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, +public class VirtualHostConfigRecoveryHandler implements MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -84,15 +65,11 @@ public class VirtualHostConfigRecoveryHa private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); - private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>(); - private final ExchangeRegistry _exchangeRegistry; private final ExchangeFactory _exchangeFactory; private MessageStoreLogSubject _logSubject; private MessageStore _store; - private int _currentConfigVersion; - private DurableConfigurationStore _configStore; public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, ExchangeRegistry exchangeRegistry, @@ -103,76 +80,14 @@ public class VirtualHostConfigRecoveryHa _exchangeFactory = exchangeFactory; } - @Override - public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion) - { - _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); - _configStore = store; - _currentConfigVersion = configVersion; - CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START()); - } - public VirtualHostConfigRecoveryHandler begin(MessageStore store) { - _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); + _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName()); _store = store; CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); return this; } - public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId) - { - try - { - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); - - if (q == null) - { - q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, - FieldTable.convertToMap(arguments)); - _virtualHost.getQueueRegistry().registerQueue(q); - - if (alternateExchangeId != null) - { - Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId); - if (altExchange == null) - { - _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id); - return; - } - q.setAlternateExchange(altExchange); - } - } - - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); - - //Record that we have a queue for recovery - _queueRecoveries.put(queueName, 0); - } - catch (AMQException e) - { - throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e); - } - } - - public void exchange(UUID id, String exchangeName, String type, boolean autoDelete) - { - try - { - Exchange exchange; - exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - exchange = _exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete); - _exchangeRegistry.registerExchange(exchange); - } - } - catch (AMQException e) - { - throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e); - } - } - public StoredMessageRecoveryHandler begin() { return this; @@ -347,56 +262,6 @@ public class VirtualHostConfigRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } - private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) - { - try - { - Exchange exchange = _exchangeRegistry.getExchange(exchangeId); - if (exchange == null) - { - _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId); - return; - } - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); - if (queue == null) - { - _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName()); - } - else - { - FieldTable argumentsFT = null; - if(buf != null) - { - try - { - argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); - } - catch (IOException e) - { - throw new RuntimeException("IOException should not be thrown here", e); - } - } - - Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT); - - if(exchange.getBinding(bindingKey, queue, argumentMap) == null) - { - - _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName() - + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); - - exchange.restoreBinding(bindingId, bindingKey, queue, argumentMap); - } - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - public void complete() { } @@ -478,201 +343,6 @@ public class VirtualHostConfigRecoveryHa return this; } - @Override - public void configuredObject(UUID id, String type, Map<String, Object> attributes) - { - Map<UUID, Map<String, Object>> typeMap = _configuredObjects.get(type); - if(typeMap == null) - { - typeMap = new HashMap<UUID, Map<String, Object>>(); - _configuredObjects.put(type,typeMap); - } - typeMap.put(id, attributes); - } - - @Override - public int completeConfigurationRecovery() - { - if(CURRENT_CONFIG_VERSION !=_currentConfigVersion) - { - try - { - upgrade(); - } - catch (AMQStoreException e) - { - throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION); - } - } - - Map<UUID, Map<String, Object>> exchangeObjects = - _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName()); - - if(exchangeObjects != null) - { - recoverExchanges(exchangeObjects); - } - - Map<UUID, Map<String, Object>> queueObjects = - _configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName()); - - if(queueObjects != null) - { - recoverQueues(queueObjects); - } - - - Map<UUID, Map<String, Object>> bindingObjects = - _configuredObjects.remove(Binding.class.getName()); - - if(bindingObjects != null) - { - recoverBindings(bindingObjects); - } - - - CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); - - return CURRENT_CONFIG_VERSION; - } - - private void upgrade() throws AMQStoreException - { - - Map<UUID, String> updates = new HashMap<UUID, String>(); - - final String bindingType = Binding.class.getName(); - - switch(_currentConfigVersion) - { - case 0: - Map<UUID, Map<String, Object>> bindingObjects = - _configuredObjects.get(bindingType); - if(bindingObjects != null) - { - for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet()) - { - Map<String, Object> binding = bindingEntry.getValue(); - - if(hasSelectorArguments(binding) && !isTopicExchange(binding)) - { - binding = new LinkedHashMap<String, Object>(binding); - removeSelectorArguments(binding); - bindingEntry.setValue(binding); - - updates.put(bindingEntry.getKey(), bindingType); - } - } - } - case CURRENT_CONFIG_VERSION: - if(!updates.isEmpty()) - { - ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()]; - int i = 0; - for(Map.Entry<UUID, String> update : updates.entrySet()) - { - updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey())); - } - _configStore.update(updateRecords); - } - break; - default: - throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?"); - } - } - - private void removeSelectorArguments(Map<String, Object> binding) - { - @SuppressWarnings("unchecked") - Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS)); - - FilterSupport.removeFilters(arguments); - binding.put(Binding.ARGUMENTS, arguments); - } - - private boolean isTopicExchange(Map<String, Object> binding) - { - UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE)); - final - Map<UUID, Map<String, Object>> exchanges = - _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName()); - - if(exchanges != null && exchanges.containsKey(exchangeId)) - { - return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE)); - } - else - { - return _exchangeRegistry.getExchange(exchangeId) != null - && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE; - } - - } - - private boolean hasSelectorArguments(Map<String, Object> binding) - { - @SuppressWarnings("unchecked") - Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS); - return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); - } - - private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects) - { - for(Map.Entry<UUID, Map<String,Object>> entry : exchangeObjects.entrySet()) - { - Map<String,Object> attributeMap = entry.getValue(); - String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME); - String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); - String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); - boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; - exchange(entry.getKey(), exchangeName, exchangeType, autoDelete); - } - } - - private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects) - { - for(Map.Entry<UUID, Map<String,Object>> entry : queueObjects.entrySet()) - { - Map<String,Object> attributeMap = entry.getValue(); - - String queueName = (String) attributeMap.get(Queue.NAME); - String owner = (String) attributeMap.get(Queue.OWNER); - boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); - UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); - @SuppressWarnings("unchecked") - Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS); - FieldTable arguments = null; - if (queueArgumentsMap != null) - { - arguments = FieldTable.convertToFieldTable(queueArgumentsMap); - } - queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId); - } - } - - private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects) - { - for(Map.Entry<UUID, Map<String,Object>> entry : bindingObjects.entrySet()) - { - Map<String,Object> attributeMap = entry.getValue(); - UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE)); - UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE)); - String bindingName = (String) attributeMap.get(Binding.NAME); - - @SuppressWarnings("unchecked") - Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS); - FieldTable arguments = null; - if (bindingArgumentsMap != null) - { - arguments = FieldTable.convertToFieldTable(bindingArgumentsMap); - } - ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes())); - - binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB); - } - } - private static class DummyMessage implements EnqueableMessage { Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java?rev=1514360&r1=1514359&r2=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java Thu Aug 15 16:42:39 2013 @@ -37,7 +37,8 @@ public class MessageStoreLogSubjectTest _testVhost = BrokerTestHelper.createVirtualHost("test"); - _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName()); + _subject = new MessageStoreLogSubject(_testVhost.getName(), + _testVhost.getMessageStore().getClass().getSimpleName()); } @Override Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1514360&r1=1514359&r2=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Thu Aug 15 16:42:39 2013 @@ -56,6 +56,11 @@ import org.apache.qpid.util.FileUtils; public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { private static final String EXCHANGE_NAME = "exchangeName"; + + private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName(); + private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName(); + private static final String QUEUE = Queue.class.getSimpleName(); + private String _storePath; private String _storeName; private MessageStore _messageStore; @@ -134,7 +139,7 @@ public abstract class AbstractDurableCon DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), + verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE), eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); @@ -186,7 +191,7 @@ public abstract class AbstractDurableCon map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); - verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), + verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING), eq(map)); } @@ -201,7 +206,7 @@ public abstract class AbstractDurableCon reopenStore(); verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(org.apache.qpid.server.model.Binding.class.getName()), + eq(BINDING), anyMap()); } @@ -215,7 +220,7 @@ public abstract class AbstractDurableCon queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -238,7 +243,7 @@ public abstract class AbstractDurableCon queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); queueAttributes.put(Queue.ARGUMENTS, attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -256,7 +261,7 @@ public abstract class AbstractDurableCon queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } private Exchange createTestAlternateExchange() @@ -292,7 +297,7 @@ public abstract class AbstractDurableCon queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.put(Queue.ARGUMENTS, attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -323,7 +328,7 @@ public abstract class AbstractDurableCon queueAttributes.put(Queue.ARGUMENTS, attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testRemoveQueue() throws Exception Copied: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java (from r1513398, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java&r1=1513398&r2=1514360&rev=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java Thu Aug 15 16:42:39 2013 @@ -21,11 +21,14 @@ package org.apache.qpid.server.virtualhost; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; @@ -39,12 +42,15 @@ import org.apache.qpid.server.plugin.Exc import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -53,17 +59,20 @@ import static org.mockito.Mockito.when; import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; -public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase +public class DurableConfigurationRecovererTest extends QpidTestCase { + private static final UUID QUEUE_ID = new UUID(0,0); + private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); + private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); + private static final String CUSTOM_EXCHANGE_NAME = "customExchange"; + + private DurableConfigurationRecoverer _durableConfigurationRecoverer; private Exchange _directExchange; private Exchange _topicExchange; private VirtualHost _vhost; - private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler; private DurableConfigurationStore _store; - - private static final UUID QUEUE_ID = new UUID(0,0); - private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); - private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); + private ExchangeFactory _exchangeFactory; + private ExchangeRegistry _exchangeRegistry; @Override public void setUp() throws Exception @@ -82,17 +91,31 @@ public class VirtualHostConfigRecoveryHa _vhost = mock(VirtualHost.class); - ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class); - when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); - when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + _exchangeRegistry = mock(ExchangeRegistry.class); + when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); + when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); QueueRegistry queueRegistry = mock(QueueRegistry.class); when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); - ExchangeFactory exchangeFactory = mock(ExchangeFactory.class); - _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory); + _exchangeFactory = mock(ExchangeFactory.class); + + DurableConfiguredObjectRecoverer[] recoverers = { + new QueueRecoverer(_vhost, _exchangeRegistry), + new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), + new BindingRecoverer(_vhost, _exchangeRegistry) + }; + + final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>(); + for(DurableConfiguredObjectRecoverer recoverer : recoverers) + { + recovererMap.put(recoverer.getType(), recoverer); + } + _durableConfigurationRecoverer = + new DurableConfigurationRecoverer(_vhost.getName(), recovererMap, + new DefaultUpgraderProvider(_vhost, _exchangeRegistry)); _store = mock(DurableConfigurationStore.class); @@ -101,16 +124,18 @@ public class VirtualHostConfigRecoveryHa public void testUpgradeEmptyStore() throws Exception { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery()); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + assertEquals("Did not upgrade to the expected version", + CURRENT_CONFIG_VERSION, + _durableConfigurationRecoverer.completeConfigurationRecovery()); } public void testUpgradeNewerStoreFails() throws Exception { try { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION + 1); + _durableConfigurationRecoverer.completeConfigurationRecovery(); fail("Should not be able to start when config model is newer than current"); } catch (IllegalStateException e) @@ -122,20 +147,24 @@ public class VirtualHostConfigRecoveryHa public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + new ConfiguredObjectRecord(new UUID(1, 0), "Binding", createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) }; verifyCorrectUpdates(expected); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + _durableConfigurationRecoverer.completeConfigurationRecovery(); } @@ -143,69 +172,150 @@ public class VirtualHostConfigRecoveryHa public void testUpgradeOnlyRemovesSelectorBindings() throws Exception { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); - - - UUID customExchangeId = new UUID(3,0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); - - _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId, - "org.apache.qpid.server.model.Exchange", - createExchange("customExchange", HeadersExchange.TYPE)); - + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble", + "not-a-selector", + "moo")); + + + final UUID customExchangeId = new UUID(3,0); + + _durableConfigurationRecoverer.configuredObject(new UUID(2, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + customExchangeId, + QUEUE_ID, + "x-filter-jms-selector", + "wibble", + "not-a-selector", + "moo")); + + _durableConfigurationRecoverer.configuredObject(customExchangeId, + "org.apache.qpid.server.model.Exchange", + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + + final Exchange customExchange = mock(Exchange.class); + + when(_exchangeFactory.createExchange(eq(customExchangeId), + eq(CUSTOM_EXCHANGE_NAME), + eq(HeadersExchange.TYPE.getType()), + anyBoolean(), + anyBoolean())).thenReturn(customExchange); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange); + return null; + } + }).when(_exchangeRegistry).registerExchange(customExchange); final ConfiguredObjectRecord[] expected = { new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), - new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding", + new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding", createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) }; verifyCorrectUpdates(expected); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + _durableConfigurationRecoverer.completeConfigurationRecovery(); } public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + TOPIC_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + new ConfiguredObjectRecord(new UUID(1, 0), "Binding", createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) }; verifyCorrectUpdates(expected); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + _durableConfigurationRecoverer.completeConfigurationRecovery(); } public void testUpgradeDoesNotRecur() throws Exception { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class)); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + public void testFailsWithUnresolvedObjects() + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Binding", + createBinding("key", + new UUID(3,0), + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + try + { + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Expected resolution to fail due to unknown object"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Durable configuration has unresolved dependencies", e.getMessage()); + } + + } + + public void testFailsWithUnknownObjectType() + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + + try + { + final Map<String, Object> emptyArguments = Collections.emptyMap(); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Wibble", emptyArguments); + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Expected resolution to fail due to unknown object type"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Unkown type for configured object: Wibble", e.getMessage()); + } + + } private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1514360&r1=1514359&r2=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Thu Aug 15 16:42:39 2013 @@ -466,7 +466,7 @@ public class FailoverBehaviourTest exten try { - // an implicit recover performed when acknowledge throws an exception due to failover + // an implicit recover performed when acknowledge throws an exception due to failover lastMessage.acknowledge(); fail("JMSException should be thrown"); } @@ -529,7 +529,7 @@ public class FailoverBehaviourTest exten Message lastMessage = consumeMessages(); try { - // an implicit recover performed when acknowledge throws an exception due to failover + // an implicit recover performed when acknowledge throws an exception due to failover lastMessage.acknowledge(); fail("JMSException should be thrown"); } @@ -923,9 +923,9 @@ public class FailoverBehaviourTest exten final Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity", capacity); arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); - ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments); + ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true - + "'&autodelete='" + true + "'"); + + "'&autodelete='" + false + "'"); ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue); return queue; } Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1514360&r1=1514359&r2=1514360&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Thu Aug 15 16:42:39 2013 @@ -190,6 +190,15 @@ public class SlowMessageStore implements } @Override + public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException + { + doPreDelay("remove"); + UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects); + doPostDelay("remove"); + return removed; + } + + @Override public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { doPreDelay("update"); @@ -205,6 +214,14 @@ public class SlowMessageStore implements doPostDelay("update"); } + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException + { + doPreDelay("update"); + _durableConfigurationStore.update(createIfNecessary, records); + doPostDelay("update"); + } + public Transaction newTransaction() { doPreDelay("beginTran"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
