Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1514360&r1=1514359&r2=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
 Thu Aug 15 16:42:39 2013
@@ -23,6 +23,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreCreator;
@@ -68,7 +69,7 @@ public class StandardVirtualHost extends
 
         final
         MessageStoreLogSubject
-                storeLogSubject = new MessageStoreLogSubject(this, 
messageStore.getClass().getSimpleName());
+                storeLogSubject = new MessageStoreLogSubject(getName(), 
messageStore.getClass().getSimpleName());
         OperationalLoggingListener.listen(messageStore, storeLogSubject);
 
         return messageStore;
@@ -96,10 +97,12 @@ public class StandardVirtualHost extends
 
         _durableConfigurationStore = initialiseConfigurationStore(virtualHost);
 
-        VirtualHostConfigRecoveryHandler recoveryHandler = new 
VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), 
getExchangeFactory());
-
-        _durableConfigurationStore.configureConfigStore(getName(), 
recoveryHandler, virtualHost);
+        DurableConfigurationRecoverer configRecoverer =
+                new DurableConfigurationRecoverer(getName(), 
getDurableConfigurationRecoverers(),
+                                                  new 
DefaultUpgraderProvider(this, getExchangeRegistry()));
+        _durableConfigurationStore.configureConfigStore(getName(), 
configRecoverer, virtualHost);
 
+        VirtualHostConfigRecoveryHandler recoveryHandler = new 
VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), 
getExchangeFactory());
         _messageStore.configureMessageStore(getName(), recoveryHandler, 
recoveryHandler);
 
         initialiseModel(hostConfig);

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1514360&r1=1514359&r2=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
 Thu Aug 15 16:42:39 2013
@@ -20,11 +20,7 @@
 */
 package org.apache.qpid.server.virtualhost;
 
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -32,28 +28,16 @@ import java.util.UUID;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.FilterSupport;
-import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
 import org.apache.qpid.server.store.StoredMessage;
@@ -65,11 +49,8 @@ import org.apache.qpid.server.txn.DtxReg
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
-import org.apache.qpid.util.ByteBufferInputStream;
 
-import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
-
-public class VirtualHostConfigRecoveryHandler implements 
ConfigurationRecoveryHandler,
+public class VirtualHostConfigRecoveryHandler implements
                                                         
MessageStoreRecoveryHandler,
                                                         
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
                                                         
TransactionLogRecoveryHandler,
@@ -84,15 +65,11 @@ public class VirtualHostConfigRecoveryHa
     private final Map<Long, ServerMessage> _recoveredMessages = new 
HashMap<Long, ServerMessage>();
     private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, 
StoredMessage>();
 
-    private final Map<String, Map<UUID, Map<String, Object>>> 
_configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
-
     private final ExchangeRegistry _exchangeRegistry;
     private final ExchangeFactory _exchangeFactory;
 
     private MessageStoreLogSubject _logSubject;
     private MessageStore _store;
-    private int _currentConfigVersion;
-    private DurableConfigurationStore _configStore;
 
     public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
                                             ExchangeRegistry exchangeRegistry,
@@ -103,76 +80,14 @@ public class VirtualHostConfigRecoveryHa
         _exchangeFactory = exchangeFactory;
     }
 
-    @Override
-    public void beginConfigurationRecovery(DurableConfigurationStore store, 
int configVersion)
-    {
-        _logSubject = new 
MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
-        _configStore = store;
-        _currentConfigVersion = configVersion;
-        CurrentActor.get().message(_logSubject, 
ConfigStoreMessages.RECOVERY_START());
-    }
-
     public VirtualHostConfigRecoveryHandler begin(MessageStore store)
     {
-        _logSubject = new 
MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
+        _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), 
store.getClass().getSimpleName());
         _store = store;
         CurrentActor.get().message(_logSubject, 
TransactionLogMessages.RECOVERY_START(null, false));
         return this;
     }
 
-    public void queue(UUID id, String queueName, String owner, boolean 
exclusive, FieldTable arguments, UUID alternateExchangeId)
-    {
-        try
-        {
-            AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
-
-            if (q == null)
-            {
-                q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, 
owner, false, exclusive, _virtualHost,
-                                                       
FieldTable.convertToMap(arguments));
-                _virtualHost.getQueueRegistry().registerQueue(q);
-
-                if (alternateExchangeId != null)
-                {
-                    Exchange altExchange = 
_exchangeRegistry.getExchange(alternateExchangeId);
-                    if (altExchange == null)
-                    {
-                        _logger.error("Unknown exchange id " + 
alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
-                        return;
-                    }
-                    q.setAlternateExchange(altExchange);
-                }
-            }
-
-            CurrentActor.get().message(_logSubject, 
TransactionLogMessages.RECOVERY_START(queueName, true));
-
-            //Record that we have a queue for recovery
-            _queueRecoveries.put(queueName, 0);
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException("Error recovering queue uuid " + id + " 
name " + queueName, e);
-        }
-    }
-
-    public void exchange(UUID id, String exchangeName, String type, boolean 
autoDelete)
-    {
-        try
-        {
-            Exchange exchange;
-            exchange = _exchangeRegistry.getExchange(exchangeName);
-            if (exchange == null)
-            {
-                exchange = _exchangeFactory.createExchange(id, exchangeName, 
type, true, autoDelete);
-                _exchangeRegistry.registerExchange(exchange);
-            }
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException("Error recovering exchange uuid " + id 
+ " name " + exchangeName, e);
-        }
-    }
-
     public StoredMessageRecoveryHandler begin()
     {
         return this;
@@ -347,56 +262,6 @@ public class VirtualHostConfigRecoveryHa
         CurrentActor.get().message(_logSubject, 
TransactionLogMessages.RECOVERY_COMPLETE(null, false));
     }
 
-    private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String 
bindingKey, ByteBuffer buf)
-    {
-        try
-        {
-            Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
-            if (exchange == null)
-            {
-                _logger.error("Unknown exchange id " + exchangeId + ", cannot 
bind queue with id " + queueId);
-                return;
-            }
-
-            AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
-            if (queue == null)
-            {
-                _logger.error("Unknown queue id " + queueId + ", cannot be 
bound to exchange: " + exchange.getName());
-            }
-            else
-            {
-                FieldTable argumentsFT = null;
-                if(buf != null)
-                {
-                    try
-                    {
-                        argumentsFT = new FieldTable(new DataInputStream(new 
ByteBufferInputStream(buf)),buf.limit());
-                    }
-                    catch (IOException e)
-                    {
-                        throw new RuntimeException("IOException should not be 
thrown here", e);
-                    }
-                }
-
-                Map<String, Object> argumentMap = 
FieldTable.convertToMap(argumentsFT);
-
-                if(exchange.getBinding(bindingKey, queue, argumentMap) == null)
-                {
-
-                    _logger.info("Restoring binding: (Exchange: " + 
exchange.getNameShortString() + ", Queue: " + queue.getName()
-                        + ", Routing Key: " + bindingKey + ", Arguments: " + 
argumentsFT + ")");
-
-                    exchange.restoreBinding(bindingId, bindingKey, queue, 
argumentMap);
-                }
-            }
-        }
-        catch (AMQException e)
-        {
-             throw new RuntimeException(e);
-        }
-
-    }
-
     public void complete()
     {
     }
@@ -478,201 +343,6 @@ public class VirtualHostConfigRecoveryHa
         return this;
     }
 
-    @Override
-    public void configuredObject(UUID id, String type, Map<String, Object> 
attributes)
-    {
-        Map<UUID, Map<String, Object>> typeMap = _configuredObjects.get(type);
-        if(typeMap == null)
-        {
-            typeMap = new HashMap<UUID, Map<String, Object>>();
-            _configuredObjects.put(type,typeMap);
-        }
-        typeMap.put(id, attributes);
-    }
-
-    @Override
-    public int completeConfigurationRecovery()
-    {
-        if(CURRENT_CONFIG_VERSION !=_currentConfigVersion)
-        {
-            try
-            {
-                upgrade();
-            }
-            catch (AMQStoreException e)
-            {
-                throw new IllegalArgumentException("Unable to upgrade 
configuration from version " + _currentConfigVersion + " to version " + 
CURRENT_CONFIG_VERSION);
-            }
-        }
-
-        Map<UUID, Map<String, Object>> exchangeObjects =
-                
_configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
-
-        if(exchangeObjects != null)
-        {
-            recoverExchanges(exchangeObjects);
-        }
-
-        Map<UUID, Map<String, Object>> queueObjects =
-                
_configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName());
-
-        if(queueObjects != null)
-        {
-            recoverQueues(queueObjects);
-        }
-
-
-        Map<UUID, Map<String, Object>> bindingObjects =
-                    _configuredObjects.remove(Binding.class.getName());
-
-        if(bindingObjects != null)
-        {
-            recoverBindings(bindingObjects);
-        }
-
-
-        CurrentActor.get().message(_logSubject, 
ConfigStoreMessages.RECOVERY_COMPLETE());
-
-        return CURRENT_CONFIG_VERSION;
-    }
-
-    private void upgrade() throws AMQStoreException
-    {
-
-        Map<UUID, String> updates = new HashMap<UUID, String>();
-
-        final String bindingType = Binding.class.getName();
-
-        switch(_currentConfigVersion)
-        {
-            case 0:
-                Map<UUID, Map<String, Object>> bindingObjects =
-                                    _configuredObjects.get(bindingType);
-                if(bindingObjects != null)
-                {
-                    for(Map.Entry<UUID, Map<String,Object>> bindingEntry : 
bindingObjects.entrySet())
-                    {
-                        Map<String, Object> binding = bindingEntry.getValue();
-
-                        if(hasSelectorArguments(binding) && 
!isTopicExchange(binding))
-                        {
-                            binding = new LinkedHashMap<String, 
Object>(binding);
-                            removeSelectorArguments(binding);
-                            bindingEntry.setValue(binding);
-
-                            updates.put(bindingEntry.getKey(), bindingType);
-                        }
-                    }
-                }
-            case CURRENT_CONFIG_VERSION:
-                if(!updates.isEmpty())
-                {
-                    ConfiguredObjectRecord[] updateRecords = new 
ConfiguredObjectRecord[updates.size()];
-                    int i = 0;
-                    for(Map.Entry<UUID, String> update : updates.entrySet())
-                    {
-                        updateRecords[i++] = new 
ConfiguredObjectRecord(update.getKey(), update.getValue(), 
_configuredObjects.get(update.getValue()).get(update.getKey()));
-                    }
-                    _configStore.update(updateRecords);
-                }
-                break;
-            default:
-                throw new IllegalStateException("Unknown configuration model 
version: " + _currentConfigVersion + ". Are you attempting to run an older 
instance against an upgraded configuration?");
-        }
-    }
-
-    private void removeSelectorArguments(Map<String, Object> binding)
-    {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> arguments = new LinkedHashMap<String, 
Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
-
-        FilterSupport.removeFilters(arguments);
-        binding.put(Binding.ARGUMENTS, arguments);
-    }
-
-    private boolean isTopicExchange(Map<String, Object> binding)
-    {
-        UUID exchangeId = 
UUID.fromString((String)binding.get(Binding.EXCHANGE));
-        final
-        Map<UUID, Map<String, Object>> exchanges =
-                
_configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName());
-
-        if(exchanges != null && exchanges.containsKey(exchangeId))
-        {
-            return 
"topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE));
-        }
-        else
-        {
-            return _exchangeRegistry.getExchange(exchangeId) != null
-                   && _exchangeRegistry.getExchange(exchangeId).getType() == 
TopicExchange.TYPE;
-        }
-
-    }
-
-    private boolean hasSelectorArguments(Map<String, Object> binding)
-    {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> arguments = (Map<String, Object>) 
binding.get(Binding.ARGUMENTS);
-        return (arguments != null) && 
FilterSupport.argumentsContainFilter(arguments);
-    }
-
-    private void recoverExchanges(Map<UUID, Map<String, Object>> 
exchangeObjects)
-    {
-        for(Map.Entry<UUID, Map<String,Object>> entry : 
exchangeObjects.entrySet())
-        {
-            Map<String,Object> attributeMap = entry.getValue();
-            String exchangeName = (String) 
attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
-            String exchangeType = (String) 
attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
-            String lifeTimePolicy = (String) 
attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
-            boolean autoDelete = lifeTimePolicy == null
-                    || LifetimePolicy.valueOf(lifeTimePolicy) == 
LifetimePolicy.AUTO_DELETE;
-            exchange(entry.getKey(), exchangeName, exchangeType, autoDelete);
-        }
-    }
-
-    private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects)
-    {
-        for(Map.Entry<UUID, Map<String,Object>> entry : 
queueObjects.entrySet())
-        {
-            Map<String,Object> attributeMap = entry.getValue();
-
-            String queueName = (String) attributeMap.get(Queue.NAME);
-            String owner = (String) attributeMap.get(Queue.OWNER);
-            boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
-            UUID alternateExchangeId = 
attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : 
UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
-            @SuppressWarnings("unchecked")
-            Map<String, Object> queueArgumentsMap = (Map<String, Object>) 
attributeMap.get(Queue.ARGUMENTS);
-            FieldTable arguments = null;
-            if (queueArgumentsMap != null)
-            {
-                arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
-            }
-            queue(entry.getKey(), queueName, owner, exclusive, arguments, 
alternateExchangeId);
-        }
-    }
-
-    private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects)
-    {
-        for(Map.Entry<UUID, Map<String,Object>> entry : 
bindingObjects.entrySet())
-        {
-            Map<String,Object> attributeMap = entry.getValue();
-            UUID exchangeId = 
UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
-            UUID queueId = UUID.fromString((String) 
attributeMap.get(Binding.QUEUE));
-            String bindingName = (String) attributeMap.get(Binding.NAME);
-
-            @SuppressWarnings("unchecked")
-            Map<String, Object> bindingArgumentsMap = (Map<String, Object>) 
attributeMap.get(Binding.ARGUMENTS);
-            FieldTable arguments = null;
-            if (bindingArgumentsMap != null)
-            {
-                arguments = 
FieldTable.convertToFieldTable(bindingArgumentsMap);
-            }
-            ByteBuffer argumentsBB = (arguments == null ? null : 
ByteBuffer.wrap(arguments.getDataAsBytes()));
-
-            binding(entry.getKey(), exchangeId, queueId, bindingName, 
argumentsBB);
-        }
-    }
-
     private static class DummyMessage implements EnqueableMessage
     {
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java?rev=1514360&r1=1514359&r2=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
 Thu Aug 15 16:42:39 2013
@@ -37,7 +37,8 @@ public class MessageStoreLogSubjectTest 
 
         _testVhost = BrokerTestHelper.createVirtualHost("test");
 
-        _subject = new MessageStoreLogSubject(_testVhost, 
_testVhost.getMessageStore().getClass().getSimpleName());
+        _subject = new MessageStoreLogSubject(_testVhost.getName(),
+                                              
_testVhost.getMessageStore().getClass().getSimpleName());
     }
 
     @Override

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1514360&r1=1514359&r2=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
 Thu Aug 15 16:42:39 2013
@@ -56,6 +56,11 @@ import org.apache.qpid.util.FileUtils;
 public abstract class AbstractDurableConfigurationStoreTestCase extends 
QpidTestCase
 {
     private static final String EXCHANGE_NAME = "exchangeName";
+
+    private static final String EXCHANGE = 
org.apache.qpid.server.model.Exchange.class.getSimpleName();
+    private static final String BINDING = 
org.apache.qpid.server.model.Binding.class.getSimpleName();
+    private static final String QUEUE = Queue.class.getSimpleName();
+
     private String _storePath;
     private String _storeName;
     private MessageStore _messageStore;
@@ -134,7 +139,7 @@ public abstract class AbstractDurableCon
         DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
 
         reopenStore();
-        verify(_recoveryHandler).configuredObject(eq(_exchangeId), 
eq(org.apache.qpid.server.model.Exchange.class.getName()),
+        verify(_recoveryHandler).configuredObject(eq(_exchangeId), 
eq(EXCHANGE),
                 eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
                         org.apache.qpid.server.model.Exchange.TYPE, 
getName()+"Type",
                         org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, 
LifetimePolicy.AUTO_DELETE.toString())));
@@ -186,7 +191,7 @@ public abstract class AbstractDurableCon
         map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY);
         
map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs));
 
-        verify(_recoveryHandler).configuredObject(eq(binding.getId()), 
eq(org.apache.qpid.server.model.Binding.class.getName()),
+        verify(_recoveryHandler).configuredObject(eq(binding.getId()), 
eq(BINDING),
                 eq(map));
     }
 
@@ -201,7 +206,7 @@ public abstract class AbstractDurableCon
         reopenStore();
 
         verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
-                eq(org.apache.qpid.server.model.Binding.class.getName()),
+                eq(BINDING),
                 anyMap());
     }
 
@@ -215,7 +220,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
         queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
-        verify(_recoveryHandler).configuredObject(eq(_queueId), 
eq(Queue.class.getName()), eq(queueAttributes));
+        verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), 
eq(queueAttributes));
     }
 
     public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -238,7 +243,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
         queueAttributes.put(Queue.ARGUMENTS, attributes);
 
-        verify(_recoveryHandler).configuredObject(eq(_queueId), 
eq(Queue.class.getName()), eq(queueAttributes));
+        verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), 
eq(queueAttributes));
     }
 
     public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -256,7 +261,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, 
alternateExchange.getId().toString());
 
-        verify(_recoveryHandler).configuredObject(eq(_queueId), 
eq(Queue.class.getName()), eq(queueAttributes));
+        verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), 
eq(queueAttributes));
     }
 
     private Exchange createTestAlternateExchange()
@@ -292,7 +297,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
         queueAttributes.put(Queue.ARGUMENTS, attributes);
 
-        verify(_recoveryHandler).configuredObject(eq(_queueId), 
eq(Queue.class.getName()), eq(queueAttributes));
+        verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), 
eq(queueAttributes));
 
     }
 
@@ -323,7 +328,7 @@ public abstract class AbstractDurableCon
         queueAttributes.put(Queue.ARGUMENTS, attributes);
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, 
alternateExchange.getId().toString());
 
-        verify(_recoveryHandler).configuredObject(eq(_queueId), 
eq(Queue.class.getName()), eq(queueAttributes));
+        verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), 
eq(queueAttributes));
     }
 
     public void testRemoveQueue() throws Exception

Copied: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
 (from r1513398, 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java&r1=1513398&r2=1514360&rev=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
 Thu Aug 15 16:42:39 2013
@@ -21,11 +21,14 @@
 package org.apache.qpid.server.virtualhost;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -39,12 +42,15 @@ import org.apache.qpid.server.plugin.Exc
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -53,17 +59,20 @@ import static org.mockito.Mockito.when;
 
 import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
 
-public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase
+public class DurableConfigurationRecovererTest extends QpidTestCase
 {
+    private static final UUID QUEUE_ID = new UUID(0,0);
+    private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
+    private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
+    private static final String CUSTOM_EXCHANGE_NAME = "customExchange";
+
+    private DurableConfigurationRecoverer _durableConfigurationRecoverer;
     private Exchange _directExchange;
     private Exchange _topicExchange;
     private VirtualHost _vhost;
-    private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler;
     private DurableConfigurationStore _store;
-
-    private static final UUID QUEUE_ID = new UUID(0,0);
-    private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
-    private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
+    private ExchangeFactory _exchangeFactory;
+    private ExchangeRegistry _exchangeRegistry;
 
     @Override
     public void setUp() throws Exception
@@ -82,17 +91,31 @@ public class VirtualHostConfigRecoveryHa
 
         _vhost = mock(VirtualHost.class);
 
-        ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class);
-        
when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
-        
when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+        _exchangeRegistry = mock(ExchangeRegistry.class);
+        
when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
+        
when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
 
         QueueRegistry queueRegistry = mock(QueueRegistry.class);
         when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
 
         when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
 
-        ExchangeFactory exchangeFactory = mock(ExchangeFactory.class);
-        _virtualHostConfigRecoveryHandler = new 
VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory);
+        _exchangeFactory = mock(ExchangeFactory.class);
+
+        DurableConfiguredObjectRecoverer[] recoverers = {
+                new QueueRecoverer(_vhost, _exchangeRegistry),
+                new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
+                new BindingRecoverer(_vhost, _exchangeRegistry)
+        };
+
+        final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new 
HashMap<String, DurableConfiguredObjectRecoverer>();
+        for(DurableConfiguredObjectRecoverer recoverer : recoverers)
+        {
+            recovererMap.put(recoverer.getType(), recoverer);
+        }
+        _durableConfigurationRecoverer =
+                new DurableConfigurationRecoverer(_vhost.getName(), 
recovererMap,
+                                                  new 
DefaultUpgraderProvider(_vhost, _exchangeRegistry));
 
         _store = mock(DurableConfigurationStore.class);
 
@@ -101,16 +124,18 @@ public class VirtualHostConfigRecoveryHa
 
     public void testUpgradeEmptyStore() throws Exception
     {
-        _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 
0);
-        assertEquals("Did not upgrade to the expected version", 
CURRENT_CONFIG_VERSION, 
_virtualHostConfigRecoveryHandler.completeConfigurationRecovery());
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+        assertEquals("Did not upgrade to the expected version",
+                     CURRENT_CONFIG_VERSION,
+                     
_durableConfigurationRecoverer.completeConfigurationRecovery());
     }
 
     public void testUpgradeNewerStoreFails() throws Exception
     {
         try
         {
-            
_virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 
CURRENT_CONFIG_VERSION+1);
-            _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+            _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 
CURRENT_CONFIG_VERSION + 1);
+            _durableConfigurationRecoverer.completeConfigurationRecovery();
             fail("Should not be able to start when config model is newer than 
current");
         }
         catch (IllegalStateException e)
@@ -122,20 +147,24 @@ public class VirtualHostConfigRecoveryHa
     public void testUpgradeRemovesBindingsToNonTopicExchanges() throws 
Exception
     {
 
-        _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 
0);
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
 
-        _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
-                "org.apache.qpid.server.model.Binding",
-                createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, 
"x-filter-jms-selector", "wibble"));
+        _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+                                                           
"org.apache.qpid.server.model.Binding",
+                                                           createBinding("key",
+                                                                         
DIRECT_EXCHANGE_ID,
+                                                                         
QUEUE_ID,
+                                                                         
"x-filter-jms-selector",
+                                                                         
"wibble"));
 
         final ConfiguredObjectRecord[] expected = {
-                new ConfiguredObjectRecord(new UUID(1, 0), 
"org.apache.qpid.server.model.Binding",
+                new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
                         createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
         };
 
         verifyCorrectUpdates(expected);
 
-        _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+        _durableConfigurationRecoverer.completeConfigurationRecovery();
     }
 
 
@@ -143,69 +172,150 @@ public class VirtualHostConfigRecoveryHa
     public void testUpgradeOnlyRemovesSelectorBindings() throws Exception
     {
 
-        _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 
0);
-
-        _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
-                "org.apache.qpid.server.model.Binding",
-                createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, 
"x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
-
-
-        UUID customExchangeId = new UUID(3,0);
-
-        _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0),
-                "org.apache.qpid.server.model.Binding",
-                createBinding("key", customExchangeId, QUEUE_ID, 
"x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
-
-        _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId,
-                "org.apache.qpid.server.model.Exchange",
-                createExchange("customExchange", HeadersExchange.TYPE));
-
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
 
+        _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+                                                           
"org.apache.qpid.server.model.Binding",
+                                                           createBinding("key",
+                                                                         
DIRECT_EXCHANGE_ID,
+                                                                         
QUEUE_ID,
+                                                                         
"x-filter-jms-selector",
+                                                                         
"wibble",
+                                                                         
"not-a-selector",
+                                                                         
"moo"));
+
+
+        final UUID customExchangeId = new UUID(3,0);
+
+        _durableConfigurationRecoverer.configuredObject(new UUID(2, 0),
+                                                           
"org.apache.qpid.server.model.Binding",
+                                                           createBinding("key",
+                                                                         
customExchangeId,
+                                                                         
QUEUE_ID,
+                                                                         
"x-filter-jms-selector",
+                                                                         
"wibble",
+                                                                         
"not-a-selector",
+                                                                         
"moo"));
+
+        _durableConfigurationRecoverer.configuredObject(customExchangeId,
+                                                           
"org.apache.qpid.server.model.Exchange",
+                                                           
createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+
+        final Exchange customExchange = mock(Exchange.class);
+
+        when(_exchangeFactory.createExchange(eq(customExchangeId),
+                                             eq(CUSTOM_EXCHANGE_NAME),
+                                             
eq(HeadersExchange.TYPE.getType()),
+                                             anyBoolean(),
+                                             
anyBoolean())).thenReturn(customExchange);
+        doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                
when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange);
+                return null;
+            }
+        }).when(_exchangeRegistry).registerExchange(customExchange);
 
         final ConfiguredObjectRecord[] expected = {
                 new ConfiguredObjectRecord(new UUID(1, 0), 
"org.apache.qpid.server.model.Binding",
                         createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, 
"not-a-selector", "moo")),
-                new ConfiguredObjectRecord(new UUID(3, 0), 
"org.apache.qpid.server.model.Binding",
+                new ConfiguredObjectRecord(new UUID(2, 0), 
"org.apache.qpid.server.model.Binding",
                         createBinding("key", customExchangeId, QUEUE_ID, 
"not-a-selector", "moo"))
         };
 
         verifyCorrectUpdates(expected);
 
-        _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+        _durableConfigurationRecoverer.completeConfigurationRecovery();
     }
 
 
     public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception
     {
 
-        _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 
0);
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
 
-        _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
-                "org.apache.qpid.server.model.Binding",
-                createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, 
"x-filter-jms-selector", "wibble"));
+        _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+                                                           
"org.apache.qpid.server.model.Binding",
+                                                           createBinding("key",
+                                                                         
TOPIC_EXCHANGE_ID,
+                                                                         
QUEUE_ID,
+                                                                         
"x-filter-jms-selector",
+                                                                         
"wibble"));
 
         final ConfiguredObjectRecord[] expected = {
-                new ConfiguredObjectRecord(new UUID(1, 0), 
"org.apache.qpid.server.model.Binding",
+                new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
                         createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, 
"x-filter-jms-selector", "wibble"))
         };
 
         verifyCorrectUpdates(expected);
 
-        _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+        _durableConfigurationRecoverer.completeConfigurationRecovery();
     }
 
     public void testUpgradeDoesNotRecur() throws Exception
     {
 
-        _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 
1);
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
 
-        _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
-                "org.apache.qpid.server.model.Binding",
-                createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, 
"x-filter-jms-selector", "wibble"));
+        _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+                                                           "Binding",
+                                                           createBinding("key",
+                                                                         
DIRECT_EXCHANGE_ID,
+                                                                         
QUEUE_ID,
+                                                                         
"x-filter-jms-selector",
+                                                                         
"wibble"));
 
         doThrow(new RuntimeException("Update Should not be 
called")).when(_store).update(any(ConfiguredObjectRecord[].class));
 
-        _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+        _durableConfigurationRecoverer.completeConfigurationRecovery();
+    }
+
+    public void testFailsWithUnresolvedObjects()
+    {
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+
+        _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+                                                        "Binding",
+                                                        createBinding("key",
+                                                                      new 
UUID(3,0),
+                                                                      QUEUE_ID,
+                                                                      
"x-filter-jms-selector",
+                                                                      
"wibble"));
+
+        try
+        {
+            _durableConfigurationRecoverer.completeConfigurationRecovery();
+            fail("Expected resolution to fail due to unknown object");
+        }
+        catch(IllegalConfigurationException e)
+        {
+            assertEquals("Durable configuration has unresolved dependencies", 
e.getMessage());
+        }
+
+    }
+
+    public void testFailsWithUnknownObjectType()
+    {
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+
+        try
+        {
+            final Map<String, Object> emptyArguments = Collections.emptyMap();
+            _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+                                                            "Wibble", 
emptyArguments);
+            _durableConfigurationRecoverer.completeConfigurationRecovery();
+            fail("Expected resolution to fail due to unknown object type");
+        }
+        catch(IllegalConfigurationException e)
+        {
+            assertEquals("Unkown type for configured object: Wibble", 
e.getMessage());
+        }
+
+
     }
 
     private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) 
throws AMQStoreException

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1514360&r1=1514359&r2=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
 Thu Aug 15 16:42:39 2013
@@ -466,7 +466,7 @@ public class FailoverBehaviourTest exten
 
         try
         {
-            // an implicit recover performed when acknowledge throws an 
exception due to failover 
+            // an implicit recover performed when acknowledge throws an 
exception due to failover
             lastMessage.acknowledge();
             fail("JMSException should be thrown");
         }
@@ -529,7 +529,7 @@ public class FailoverBehaviourTest exten
         Message lastMessage = consumeMessages();
         try
         {
-            // an implicit recover performed when acknowledge throws an 
exception due to failover 
+            // an implicit recover performed when acknowledge throws an 
exception due to failover
             lastMessage.acknowledge();
             fail("JMSException should be thrown");
         }
@@ -923,9 +923,9 @@ public class FailoverBehaviourTest exten
         final Map<String, Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-capacity", capacity);
         arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
-        ((AMQSession<?, ?>) session).createQueue(new 
AMQShortString(queueName), true, true, false, arguments);
+        ((AMQSession<?, ?>) session).createQueue(new 
AMQShortString(queueName), false, true, false, arguments);
         Queue queue = session.createQueue("direct://amq.direct/" + queueName + 
"/" + queueName + "?durable='" + true
-                + "'&autodelete='" + true + "'");
+                + "'&autodelete='" + false + "'");
         ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
         return queue;
     }

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1514360&r1=1514359&r2=1514360&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
 Thu Aug 15 16:42:39 2013
@@ -190,6 +190,15 @@ public class SlowMessageStore implements
     }
 
     @Override
+    public UUID[] removeConfiguredObjects(final UUID... objects) throws 
AMQStoreException
+    {
+        doPreDelay("remove");
+        UUID[] removed = 
_durableConfigurationStore.removeConfiguredObjects(objects);
+        doPostDelay("remove");
+        return removed;
+    }
+
+    @Override
     public void update(UUID id, String type, Map<String, Object> attributes) 
throws AMQStoreException
     {
         doPreDelay("update");
@@ -205,6 +214,14 @@ public class SlowMessageStore implements
         doPostDelay("update");
     }
 
+    @Override
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... 
records) throws AMQStoreException
+    {
+        doPreDelay("update");
+        _durableConfigurationStore.update(createIfNecessary, records);
+        doPostDelay("update");
+    }
+
     public Transaction newTransaction()
     {
         doPreDelay("beginTran");



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to