Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1589112&r1=1589111&r2=1589112&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 Tue Apr 22 12:32:23 2014
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,13 +28,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase;
-import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
@@ -45,14 +37,11 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class VirtualHostStoreUpgraderAndRecoverer
 {
-    private final ConfiguredObjectFactory _objectFactory;
     private final VirtualHostNode<?> _virtualHostNode;
-    private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, 
UpgraderPhaseFactory>();
+    private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, 
StoreUpgraderPhase>();
 
     @SuppressWarnings("serial")
     private static final Map<String, String> DEFAULT_EXCHANGES = 
Collections.unmodifiableMap(new HashMap<String, String>()
@@ -68,12 +57,11 @@ public class VirtualHostStoreUpgraderAnd
     public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> 
virtualHostNode, ConfiguredObjectFactory objectFactory)
     {
         _virtualHostNode = virtualHostNode;
-        _objectFactory = objectFactory;
-        register(new UpgraderFactory_0_0());
-        register(new UpgraderFactory_0_1());
-        register(new UpgraderFactory_0_2());
-        register(new UpgraderFactory_0_3());
-        register(new UpgraderFactory_0_4());
+        register(new Upgrader_0_0_to_0_1());
+        register(new Upgrader_0_1_to_0_2());
+        register(new Upgrader_0_2_to_0_3());
+        register(new Upgrader_0_3_to_0_4());
+        register(new Upgrader_0_4_to_0_5());
 
         Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
         for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -84,9 +72,9 @@ public class VirtualHostStoreUpgraderAnd
         _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
     }
 
-    private void register(UpgraderPhaseFactory factory)
+    private void register(StoreUpgraderPhase upgrader)
     {
-        _upgraders.put(factory.getFromVersion(), factory);
+        _upgraders.put(upgrader.getFromVersion(), upgrader);
     }
 
     /*
@@ -94,100 +82,91 @@ public class VirtualHostStoreUpgraderAnd
      * such bindings would have been ignored, starting from the point at which 
the config version changed, these
      * arguments would actually cause selectors to be enforced, thus changing 
which messages would reach a queue.
      */
-    private class UpgraderFactory_0_0  extends UpgraderPhaseFactory
+    private class Upgrader_0_0_to_0_1  extends StoreUpgraderPhase
     {
         private final Map<UUID, ConfiguredObjectRecord> _records = new 
HashMap<UUID, ConfiguredObjectRecord>();
 
-        public UpgraderFactory_0_0()
+        public Upgrader_0_0_to_0_1()
         {
-            super("0.0", "0.1");
+            super("modelVersion", "0.0", "0.1");
         }
 
-
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(final ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
-            {
+            _records.put(record.getId(), record);
+        }
 
-                @Override
-                public void configuredObject(final ConfiguredObjectRecord 
record)
-                {
-                    _records.put(record.getId(), record);
-                }
+        private void removeSelectorArguments(Map<String, Object> binding)
+        {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> arguments = new LinkedHashMap<String, 
Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
 
-                private void removeSelectorArguments(Map<String, Object> 
binding)
-                {
-                    @SuppressWarnings("unchecked")
-                    Map<String, Object> arguments = new LinkedHashMap<String, 
Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+            FilterSupport.removeFilters(arguments);
+            binding.put(Binding.ARGUMENTS, arguments);
+        }
 
-                    FilterSupport.removeFilters(arguments);
-                    binding.put(Binding.ARGUMENTS, arguments);
-                }
+        private boolean isTopicExchange(ConfiguredObjectRecord entry)
+        {
+            ConfiguredObjectRecord exchangeRecord = 
entry.getParents().get("Exchange");
+            if (exchangeRecord == null)
+            {
+                return false;
+            }
+            UUID exchangeId = exchangeRecord.getId();
 
-                private boolean isTopicExchange(ConfiguredObjectRecord entry)
+            if(_records.containsKey(exchangeId))
+            {
+                return "topic".equals(_records.get(exchangeId)
+                        .getAttributes()
+                        .get(org.apache.qpid.server.model.Exchange.TYPE));
+            }
+            else
+            {
+                if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
                 {
-                    ConfiguredObjectRecord exchangeRecord = 
entry.getParents().get("Exchange");
-                    if (exchangeRecord == null)
-                    {
-                        return false;
-                    }
-                    UUID exchangeId = exchangeRecord.getId();
+                    return true;
+                }
 
-                    if(_records.containsKey(exchangeId))
-                    {
-                        return "topic".equals(_records.get(exchangeId)
-                                .getAttributes()
-                                
.get(org.apache.qpid.server.model.Exchange.TYPE));
-                    }
-                    else
-                    {
-                        if 
(_defaultExchangeIds.get("amq.topic").equals(exchangeId))
-                        {
-                            return true;
-                        }
+                return false;
+            }
 
-                        return false;
-                    }
+        }
 
-                }
+        private boolean hasSelectorArguments(Map<String, Object> binding)
+        {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> arguments = (Map<String, Object>) 
binding.get(Binding.ARGUMENTS);
+            return (arguments != null) && 
FilterSupport.argumentsContainFilter(arguments);
+        }
 
-                private boolean hasSelectorArguments(Map<String, Object> 
binding)
+        @Override
+        public void complete()
+        {
+            for(Map.Entry<UUID,ConfiguredObjectRecord> entry : 
_records.entrySet())
+            {
+                ConfiguredObjectRecord record = entry.getValue();
+                String type = record.getType();
+                Map<String, Object> attributes = record.getAttributes();
+                UUID id = record.getId();
+                if ("org.apache.qpid.server.model.VirtualHost".equals(type))
                 {
-                    @SuppressWarnings("unchecked")
-                    Map<String, Object> arguments = (Map<String, Object>) 
binding.get(Binding.ARGUMENTS);
-                    return (arguments != null) && 
FilterSupport.argumentsContainFilter(arguments);
+                    record = upgradeRootRecord(record);
                 }
-
-                @Override
-                public void complete()
+                else if(type.equals(Binding.class.getName()) && 
hasSelectorArguments(attributes) && !isTopicExchange(record))
                 {
-                    for(Map.Entry<UUID,ConfiguredObjectRecord> entry : 
_records.entrySet())
-                    {
-                        ConfiguredObjectRecord record = entry.getValue();
-                        String type = record.getType();
-                        Map<String, Object> attributes = 
record.getAttributes();
-                        UUID id = record.getId();
-                        if 
("org.apache.qpid.server.model.VirtualHost".equals(type))
-                        {
-                            record = upgradeRootRecord(record);
-                        }
-                        else if(type.equals(Binding.class.getName()) && 
hasSelectorArguments(attributes) && !isTopicExchange(record))
-                        {
-                            attributes = new LinkedHashMap<String, 
Object>(attributes);
-                            removeSelectorArguments(attributes);
-
-                            record = new ConfiguredObjectRecordImpl(id, type, 
attributes, record.getParents());
-                            getUpdateMap().put(id, record);
-                            entry.setValue(record);
+                    attributes = new LinkedHashMap<String, Object>(attributes);
+                    removeSelectorArguments(attributes);
 
-                        }
-                        getNextUpgrader().configuredObject(record);
-                    }
+                    record = new ConfiguredObjectRecordImpl(id, type, 
attributes, record.getParents());
+                    getUpdateMap().put(id, record);
+                    entry.setValue(record);
 
-                    getNextUpgrader().complete();
                 }
-            };
+                getNextUpgrader().configuredObject(record);
+            }
+
+            getNextUpgrader().complete();
         }
 
     }
@@ -196,76 +175,68 @@ public class VirtualHostStoreUpgraderAnd
      * Change the type string from org.apache.qpid.server.model.Foo to Foo (in 
line with the practice in the broker
      * configuration store).  Also remove bindings which reference nonexistent 
queues or exchanges.
      */
-    private class UpgraderFactory_0_1 extends UpgraderPhaseFactory
+    private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase
     {
-        protected UpgraderFactory_0_1()
+        public Upgrader_0_1_to_0_2()
         {
-            super("0.1", "0.2");
+            super("modelVersion", "0.1", "0.2");
         }
 
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(final ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
+            String type = record.getType().substring(1 + 
record.getType().lastIndexOf('.'));
+            ConfiguredObjectRecord newRecord = new 
ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), 
record.getParents());
+            getUpdateMap().put(record.getId(), newRecord);
+
+            if ("VirtualHost".equals(type))
             {
+                newRecord = upgradeRootRecord(newRecord);
+            }
+        }
 
-                @Override
-                public void configuredObject(final ConfiguredObjectRecord 
record)
+        @Override
+        public void complete()
+        {
+            for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = 
getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+            {
+                Map.Entry<UUID, ConfiguredObjectRecord> entry = 
iterator.next();
+                final ConfiguredObjectRecord record = entry.getValue();
+                final ConfiguredObjectRecord exchangeParent = 
record.getParents().get(Exchange.class.getSimpleName());
+                final ConfiguredObjectRecord queueParent = 
record.getParents().get(Queue.class.getSimpleName());
+                if(isBinding(record.getType()) && (exchangeParent == null || 
unknownExchange(exchangeParent.getId())
+                                                   || queueParent == null || 
unknownQueue(queueParent.getId())))
                 {
-                    String type = record.getType().substring(1 + 
record.getType().lastIndexOf('.'));
-                    ConfiguredObjectRecord newRecord = new 
ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), 
record.getParents());
-                    getUpdateMap().put(record.getId(), newRecord);
-
-                    if ("VirtualHost".equals(type))
-                    {
-                        newRecord = upgradeRootRecord(newRecord);
-                    }
+                    getDeleteMap().put(entry.getKey(), entry.getValue());
+                    iterator.remove();
                 }
-
-                @Override
-                public void complete()
+                else
                 {
-                    for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> 
iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
-                    {
-                        Map.Entry<UUID, ConfiguredObjectRecord> entry = 
iterator.next();
-                        final ConfiguredObjectRecord record = entry.getValue();
-                        final ConfiguredObjectRecord exchangeParent = 
record.getParents().get(Exchange.class.getSimpleName());
-                        final ConfiguredObjectRecord queueParent = 
record.getParents().get(Queue.class.getSimpleName());
-                        if(isBinding(record.getType()) && (exchangeParent == 
null || unknownExchange(exchangeParent.getId())
-                                                           || queueParent == 
null || unknownQueue(queueParent.getId())))
-                        {
-                            getDeleteMap().put(entry.getKey(), 
entry.getValue());
-                            iterator.remove();
-                        }
-                        else
-                        {
-                            getNextUpgrader().configuredObject(record);
-                        }
-                    }
-                    getNextUpgrader().complete();
+                    getNextUpgrader().configuredObject(record);
                 }
+            }
+            getNextUpgrader().complete();
+        }
 
-                private boolean unknownExchange(final UUID exchangeId)
-                {
-                    if (_defaultExchangeIds.containsValue(exchangeId))
-                    {
-                        return false;
-                    }
-                    ConfiguredObjectRecord localRecord = 
getUpdateMap().get(exchangeId);
-                    return !(localRecord != null && 
localRecord.getType().equals(Exchange.class.getSimpleName()));
-                }
+        private boolean unknownExchange(final UUID exchangeId)
+        {
+            if (_defaultExchangeIds.containsValue(exchangeId))
+            {
+                return false;
+            }
+            ConfiguredObjectRecord localRecord = 
getUpdateMap().get(exchangeId);
+            return !(localRecord != null && 
localRecord.getType().equals(Exchange.class.getSimpleName()));
+        }
 
-                private boolean unknownQueue(final UUID queueId)
-                {
-                    ConfiguredObjectRecord localRecord = 
getUpdateMap().get(queueId);
-                    return !(localRecord != null  && 
localRecord.getType().equals(Queue.class.getSimpleName()));
-                }
+        private boolean unknownQueue(final UUID queueId)
+        {
+            ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
+            return !(localRecord != null  && 
localRecord.getType().equals(Queue.class.getSimpleName()));
+        }
 
-                private boolean isBinding(final String type)
-                {
-                    return Binding.class.getSimpleName().equals(type);
-                }
-            };
+        private boolean isBinding(final String type)
+        {
+            return Binding.class.getSimpleName().equals(type);
         }
     }
 
@@ -274,52 +245,46 @@ public class VirtualHostStoreUpgraderAnd
      * Convert the storage of queue attributes to remove the separate 
"ARGUMENT" attribute, and flatten the
      * attributes into the map using the model attribute names rather than the 
wire attribute names
      */
-    private class UpgraderFactory_0_2 extends UpgraderPhaseFactory
+    private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase
     {
-        protected UpgraderFactory_0_2()
+        private static final String ARGUMENTS = "arguments";
+
+        public Upgrader_0_2_to_0_3()
         {
-            super("0.2", "0.3");
+            super("modelVersion", "0.2", "0.3");
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
+            if("VirtualHost".equals(record.getType()))
             {
-                private static final String ARGUMENTS = "arguments";
-
-                @SuppressWarnings("unchecked")
-                @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                record = upgradeRootRecord(record);
+            }
+            else if("Queue".equals(record.getType()))
+            {
+                Map<String, Object> newAttributes = new LinkedHashMap<String, 
Object>();
+                if(record.getAttributes().get(ARGUMENTS) instanceof Map)
                 {
-                    if("VirtualHost".equals(record.getType()))
-                    {
-                        record = upgradeRootRecord(record);
-                    }
-                    else if("Queue".equals(record.getType()))
-                    {
-                        Map<String, Object> newAttributes = new 
LinkedHashMap<String, Object>();
-                        if(record.getAttributes().get(ARGUMENTS) instanceof 
Map)
-                        {
-                            
newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String,
 Object>) record.getAttributes()
-                                    .get(ARGUMENTS)));
-                        }
-                        newAttributes.putAll(record.getAttributes());
+                    
newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String,
 Object>) record.getAttributes()
+                            .get(ARGUMENTS)));
+                }
+                newAttributes.putAll(record.getAttributes());
 
-                        record = new 
ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, 
record.getParents());
-                        getUpdateMap().put(record.getId(), record);
-                    }
+                record = new ConfiguredObjectRecordImpl(record.getId(), 
record.getType(), newAttributes, record.getParents());
+                getUpdateMap().put(record.getId(), record);
+            }
 
-                    getNextUpgrader().configuredObject(record);
-                }
+            getNextUpgrader().configuredObject(record);
+        }
 
-                @Override
-                public void complete()
-                {
-                    getNextUpgrader().complete();
-                }
-            };
+        @Override
+        public void complete()
+        {
+            getNextUpgrader().complete();
         }
+
     }
 
     /*
@@ -327,387 +292,125 @@ public class VirtualHostStoreUpgraderAnd
      * where exclusive was false it will now be "NONE", and where true it will 
now be "CONTAINER"
      * ensure OWNER is null unless the exclusivity policy is CONTAINER
      */
-    private class UpgraderFactory_0_3 extends UpgraderPhaseFactory
+    private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase
     {
-        protected UpgraderFactory_0_3()
+        private static final String EXCLUSIVE = "exclusive";
+
+        public Upgrader_0_3_to_0_4()
         {
-            super("0.3", "0.4");
+            super("modelVersion", "0.3", "0.4");
         }
 
+
         @Override
-        public StoreUpgraderPhase newInstance()
+        public void configuredObject(ConfiguredObjectRecord record)
         {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
+            if("VirtualHost".equals(record.getType()))
             {
-                private static final String EXCLUSIVE = "exclusive";
-
-                @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                record = upgradeRootRecord(record);
+            }
+            else if(Queue.class.getSimpleName().equals(record.getType()))
+            {
+                Map<String, Object> newAttributes = new LinkedHashMap<String, 
Object>(record.getAttributes());
+                if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
                 {
-                    if("VirtualHost".equals(record.getType()))
+                    boolean isExclusive = (Boolean) 
record.getAttributes().get(EXCLUSIVE);
+                    newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : 
"NONE");
+                    if(!isExclusive && 
record.getAttributes().containsKey("owner"))
                     {
-                        record = upgradeRootRecord(record);
+                        newAttributes.remove("owner");
                     }
-                    else 
if(Queue.class.getSimpleName().equals(record.getType()))
-                    {
-                        Map<String, Object> newAttributes = new 
LinkedHashMap<String, Object>(record.getAttributes());
-                        if(record.getAttributes().get(EXCLUSIVE) instanceof 
Boolean)
-                        {
-                            boolean isExclusive = (Boolean) 
record.getAttributes().get(EXCLUSIVE);
-                            newAttributes.put(EXCLUSIVE, isExclusive ? 
"CONTAINER" : "NONE");
-                            if(!isExclusive && 
record.getAttributes().containsKey("owner"))
-                            {
-                                newAttributes.remove("owner");
-                            }
-                        }
-                        else
-                        {
-                            newAttributes.remove("owner");
-                        }
-                        if(!record.getAttributes().containsKey("durable"))
-                        {
-                            newAttributes.put("durable","true");
-                        }
-
-                        record = new 
ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, 
record.getParents());
-                        getUpdateMap().put(record.getId(), record);
-                    }
-
-                    getNextUpgrader().configuredObject(record);
                 }
-
-                @Override
-                public void complete()
+                else
                 {
-                    getNextUpgrader().complete();
+                    newAttributes.remove("owner");
                 }
-            };
-        }
-    }
-
-    private class UpgraderFactory_0_4 extends UpgraderPhaseFactory
-    {
-        protected UpgraderFactory_0_4()
-        {
-            super("0.4", "2.0");
-        }
-
-        @Override
-        public StoreUpgraderPhase newInstance()
-        {
-            return new StoreUpgraderPhase("modelVersion", getToVersion())
-            {
-                private Map<String, String> _missingAmqpExchanges = new 
HashMap<String, String>(DEFAULT_EXCHANGES);
-                private static final String EXCHANGE_NAME = "name";
-                private static final String EXCHANGE_TYPE = "type";
-                private static final String EXCHANGE_DURABLE = "durable";
-                private ConfiguredObjectRecord _virtualHostRecord;
-
-                @Override
-                public void configuredObject(ConfiguredObjectRecord record)
+                if(!record.getAttributes().containsKey("durable"))
                 {
-                    if("VirtualHost".equals(record.getType()))
-                    {
-                        record = upgradeRootRecord(record);
-                        Map<String, Object> virtualHostAttributes = new 
HashMap<String, Object>(record.getAttributes());
-                        virtualHostAttributes.put("name", 
_virtualHostNode.getName());
-                        virtualHostAttributes.put("modelVersion", 
getToVersion());
-                        record = new 
ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", 
virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
-                        _virtualHostRecord = record;
-                    }
-                    else if("Exchange".equals(record.getType()))
-                    {
-                        Map<String, Object> attributes = 
record.getAttributes();
-                        String name = (String)attributes.get(EXCHANGE_NAME);
-                        _missingAmqpExchanges.remove(name);
-                    }
-                    getNextUpgrader().configuredObject(record);
+                    newAttributes.put("durable","true");
                 }
 
-                @Override
-                public void complete()
-                {
-                    for (Entry<String, String> entry : 
_missingAmqpExchanges.entrySet())
-                    {
-                        String name = entry.getKey();
-                        String type = entry.getValue();
-                        UUID id = _defaultExchangeIds.get(name);
-
-                        Map<String, Object> attributes = new HashMap<String, 
Object>();
-                        attributes.put(EXCHANGE_NAME, name);
-                        attributes.put(EXCHANGE_TYPE, type);
-                        attributes.put(EXCHANGE_DURABLE, true);
-
-                        ConfiguredObjectRecord record = new 
ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, 
Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
-                        getUpdateMap().put(id, record);
-
-                        getNextUpgrader().configuredObject(record);
-
-                    }
+                record = new 
ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, 
record.getParents());
+                getUpdateMap().put(record.getId(), record);
+            }
 
-                    getNextUpgrader().complete();
-                }
-            };
+            getNextUpgrader().configuredObject(record);
         }
 
-    }
+        @Override
+        public void complete()
+        {
+            getNextUpgrader().complete();
+        }
 
-    public void perform(DurableConfigurationStore durableConfigurationStore)
-    {
-        UpgradeAndRecoveryHandler vhrh = new 
UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, 
durableConfigurationStore, _upgraders);
-        durableConfigurationStore.visitConfiguredObjectRecords(vhrh);
     }
 
-    //TODO: generalize this class
-    private static class  UpgradeAndRecoveryHandler implements 
ConfiguredObjectRecordHandler
+    private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase
     {
-        private static Logger LOGGER = 
Logger.getLogger(UpgradeAndRecoveryHandler.class);
-
-        private final Map<UUID, ConfiguredObjectRecord> _records = new 
LinkedHashMap<UUID, ConfiguredObjectRecord>();
-        private Map<String, UpgraderPhaseFactory> _upgraders;
-
-        private final VirtualHostNode<?> _parent;
-        private final ConfiguredObjectFactory _configuredObjectFactory;
-        private final DurableConfigurationStore _store;
-
-        public UpgradeAndRecoveryHandler(VirtualHostNode<?> parent, 
ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore 
durableConfigurationStore, Map<String, UpgraderPhaseFactory> upgraders)
-        {
-            super();
-            _parent = parent;
-            _configuredObjectFactory = configuredObjectFactory;
-            _upgraders = upgraders;
-            _store = durableConfigurationStore;
-        }
+        private Map<String, String> _missingAmqpExchanges = new 
HashMap<String, String>(DEFAULT_EXCHANGES);
+        private static final String EXCHANGE_NAME = "name";
+        private static final String EXCHANGE_TYPE = "type";
+        private static final String EXCHANGE_DURABLE = "durable";
+        private ConfiguredObjectRecord _virtualHostRecord;
 
-        @Override
-        public void begin()
+        public Upgrader_0_4_to_0_5()
         {
+            super("modelVersion", "0.4", "2.0");
         }
 
         @Override
-        public boolean handle(final ConfiguredObjectRecord record)
-        {
-            _records.put(record.getId(), record);
-            return true;
-        }
-
-        @Override
-        public void end()
+        public void configuredObject(ConfiguredObjectRecord record)
         {
-            String version = getCurrentVersion();
-
-            if (LOGGER.isInfoEnabled())
-            {
-                LOGGER.info("Store has model version " + version + ". Number 
of record(s) " + _records.size());
-            }
-
-            DurableConfigurationStoreUpgrader upgrader = 
buildUpgraderChain(version);
-
-            for(ConfiguredObjectRecord record : _records.values())
-            {
-                upgrader.configuredObject(record);
-            }
-
-            upgrader.complete();
-
-            Map<UUID, ConfiguredObjectRecord> deletedRecords = 
upgrader.getDeletedRecords();
-            Map<UUID, ConfiguredObjectRecord> updatedRecords = 
upgrader.getUpdatedRecords();
-
-            if (LOGGER.isDebugEnabled())
+            if("VirtualHost".equals(record.getType()))
             {
-                LOGGER.debug("VirtualHost store upgrade: " + 
deletedRecords.size() + " record(s) deleted");
-                LOGGER.debug("VirtualHost store upgrade: " + 
updatedRecords.size() + " record(s) updated");
-                LOGGER.debug("VirtualHost store upgrade: " + _records.size() + 
" total record(s)");
+                record = upgradeRootRecord(record);
+                Map<String, Object> virtualHostAttributes = new 
HashMap<String, Object>(record.getAttributes());
+                virtualHostAttributes.put("name", _virtualHostNode.getName());
+                virtualHostAttributes.put("modelVersion", getToVersion());
+                record = new ConfiguredObjectRecordImpl(record.getId(), 
"VirtualHost", virtualHostAttributes, Collections.<String, 
ConfiguredObjectRecord>emptyMap());
+                _virtualHostRecord = record;
             }
-
-            _store.update(true, updatedRecords.values().toArray(new 
ConfiguredObjectRecord[updatedRecords.size()]));
-            _store.remove(deletedRecords.values().toArray(new 
ConfiguredObjectRecord[deletedRecords.size()]));
-
-            _records.keySet().removeAll(deletedRecords.keySet());
-            _records.putAll(updatedRecords);
-
-            ConfiguredObjectRecord virtualHostRecord = null;
-            for (ConfiguredObjectRecord record : _records.values())
+            else if("Exchange".equals(record.getType()))
             {
-                LOGGER.debug("Found type " +  record.getType());
-                if ("VirtualHost".equals(record.getType()))
-                {
-                    virtualHostRecord = record;
-                    break;
-                }
-            }
-
-            if (virtualHostRecord != null)
-            {
-                String parentCategory = 
_parent.getCategoryClass().getSimpleName();
-                ConfiguredObjectRecord parentRecord = new 
ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, 
Collections.<String, Object>emptyMap());
-                Map<String, ConfiguredObjectRecord> rootParents = 
Collections.<String, ConfiguredObjectRecord>singletonMap(parentCategory, 
parentRecord);
-                _records.put(virtualHostRecord.getId(), new 
ConfiguredObjectRecordImpl(virtualHostRecord.getId(), 
VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), 
rootParents));
-                Collection<ConfiguredObjectRecord> records = _records.values();
-                resolveObjects(_configuredObjectFactory, _parent, 
records.toArray(new ConfiguredObjectRecord[records.size()]));
+                Map<String, Object> attributes = record.getAttributes();
+                String name = (String)attributes.get(EXCHANGE_NAME);
+                _missingAmqpExchanges.remove(name);
             }
+            getNextUpgrader().configuredObject(record);
         }
 
-        private DurableConfigurationStoreUpgrader buildUpgraderChain(String 
version)
-        {
-            DurableConfigurationStoreUpgrader head = null;
-            while(!BrokerModel.MODEL_VERSION.equals(version))
-            {
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Adding virtual host store upgrader from 
model version: " + version);
-                }
-                final UpgraderPhaseFactory upgraderPhaseFactory = 
_upgraders.get(version);
-                StoreUpgraderPhase upgrader = 
upgraderPhaseFactory.newInstance();
-                if(head == null)
-                {
-                    head = upgrader;
-                }
-                else
-                {
-                    head.setNextUpgrader(upgrader);
-                }
-                version = upgraderPhaseFactory.getToVersion();
-            }
-
-            if(head == null)
-            {
-                head = new NullUpgrader();
-            }
-            else
-            {
-                head.setNextUpgrader(new NullUpgrader());
-            }
-
-            return head;
-        }
-
-        private String getCurrentVersion()
+        @Override
+        public void complete()
         {
-            for(ConfiguredObjectRecord record : _records.values())
+            for (Entry<String, String> entry : 
_missingAmqpExchanges.entrySet())
             {
-                if(record.getType().equals("VirtualHost"))
-                {
-                    return (String) 
record.getAttributes().get(VirtualHost.MODEL_VERSION);
-                }
-            }
-            return BrokerModel.MODEL_VERSION;
-        }
+                String name = entry.getKey();
+                String type = entry.getValue();
+                UUID id = _defaultExchangeIds.get(name);
 
+                Map<String, Object> attributes = new HashMap<String, Object>();
+                attributes.put(EXCHANGE_NAME, name);
+                attributes.put(EXCHANGE_TYPE, type);
+                attributes.put(EXCHANGE_DURABLE, true);
 
-        public void resolveObjects(ConfiguredObjectFactory factory, 
ConfiguredObject<?> root, ConfiguredObjectRecord... records)
-        {
-            Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, 
ConfiguredObject<?>>();
-            resolvedObjects.put(root.getId(), root);
-
-            Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = 
new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
-            Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> 
recordsWithUnresolvedDependencies =
-                    new ArrayList<UnresolvedConfiguredObject<? extends 
ConfiguredObject>>();
+                ConfiguredObjectRecord record = new 
ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, 
Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
+                getUpdateMap().put(id, record);
 
-            boolean updatesMade;
+                getNextUpgrader().configuredObject(record);
 
-            do
-            {
-                updatesMade = false;
-                Iterator<ConfiguredObjectRecord> iter = 
recordsWithUnresolvedParents.iterator();
-                while (iter.hasNext())
-                {
-                    ConfiguredObjectRecord record = iter.next();
-                    Collection<ConfiguredObject<?>> parents = new 
ArrayList<ConfiguredObject<?>>();
-                    boolean foundParents = true;
-                    for (ConfiguredObjectRecord parent : 
record.getParents().values())
-                    {
-                        if (!resolvedObjects.containsKey(parent.getId()))
-                        {
-                            foundParents = false;
-                            break;
-                        }
-                        else
-                        {
-                            parents.add(resolvedObjects.get(parent.getId()));
-                        }
-                    }
-                    if (foundParents)
-                    {
-                        iter.remove();
-                        UnresolvedConfiguredObject<? extends ConfiguredObject> 
recovered =
-                                factory.recover(record, parents.toArray(new 
ConfiguredObject<?>[parents.size()]));
-                        Collection<ConfiguredObjectDependency<?>> dependencies 
=
-                                recovered.getUnresolvedDependencies();
-                        if (dependencies.isEmpty())
-                        {
-                            updatesMade = true;
-                            ConfiguredObject<?> resolved = recovered.resolve();
-                            resolvedObjects.put(resolved.getId(), resolved);
-                        }
-                        else
-                        {
-                            recordsWithUnresolvedDependencies.add(recovered);
-                        }
-                    }
-
-                }
-
-                Iterator<UnresolvedConfiguredObject<? extends 
ConfiguredObject>> unresolvedIter =
-                        recordsWithUnresolvedDependencies.iterator();
-
-                while(unresolvedIter.hasNext())
-                {
-                    UnresolvedConfiguredObject<? extends ConfiguredObject> 
unresolvedObject = unresolvedIter.next();
-                    Collection<ConfiguredObjectDependency<?>> dependencies =
-                            new 
ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+            }
 
-                    for(ConfiguredObjectDependency dependency : dependencies)
-                    {
-                        if(dependency instanceof ConfiguredObjectIdDependency)
-                        {
-                            UUID id = 
((ConfiguredObjectIdDependency)dependency).getId();
-                            if(resolvedObjects.containsKey(id))
-                            {
-                                dependency.resolve(resolvedObjects.get(id));
-                            }
-                        }
-                        else if(dependency instanceof 
ConfiguredObjectNameDependency)
-                        {
-                            ConfiguredObject<?> dependentObject = null;
-                            for(ConfiguredObject<?> parent : 
unresolvedObject.getParents())
-                            {
-                                dependentObject = 
parent.findConfiguredObject(dependency.getCategoryClass(), 
((ConfiguredObjectNameDependency)dependency).getName());
-                                if(dependentObject != null)
-                                {
-                                    break;
-                                }
-                            }
-                            if(dependentObject != null)
-                            {
-                                dependency.resolve(dependentObject);
-                            }
-                        }
-                        else
-                        {
-                            throw new ServerScopedRuntimeException("Unknown 
dependency type " + dependency.getClass().getSimpleName());
-                        }
-                    }
-                    if(unresolvedObject.getUnresolvedDependencies().isEmpty())
-                    {
-                        updatesMade = true;
-                        unresolvedIter.remove();
-                        ConfiguredObject<?> resolved = 
unresolvedObject.resolve();
-                        resolvedObjects.put(resolved.getId(), resolved);
-                    }
-                }
+            getNextUpgrader().complete();
+        }
 
-            } while(updatesMade && 
!(recordsWithUnresolvedDependencies.isEmpty() && 
recordsWithUnresolvedParents.isEmpty()));
+    }
 
-            if(!recordsWithUnresolvedDependencies.isEmpty())
-            {
-                throw new IllegalArgumentException("Cannot resolve some 
objects: " + recordsWithUnresolvedDependencies);
-            }
-            if(!recordsWithUnresolvedParents.isEmpty())
-            {
-                throw new IllegalArgumentException("Cannot resolve object 
because their parents cannot be found" + recordsWithUnresolvedParents);
-            }
-        }
+    public void perform(DurableConfigurationStore durableConfigurationStore)
+    {
+        String virtualHostCategory = VirtualHost.class.getSimpleName();
+        GenericStoreUpgrader upgraderHandler = new 
GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, 
durableConfigurationStore, _upgraders);
+        upgraderHandler.upgrade();
 
+        new GenericRecoverer(_virtualHostNode, 
virtualHostCategory).recover(upgraderHandler.getRecords());
     }
 }

Copied: 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
 (from r1588966, 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java?p2=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java&p1=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java&r1=1588966&r2=1589112&rev=1589112&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
 Tue Apr 22 12:32:23 2014
@@ -18,11 +18,12 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.configuration.startup;
+package org.apache.qpid.server.store;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,9 +32,7 @@ import java.util.UUID;
 import junit.framework.TestCase;
 
 import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.RecovererProvider;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogRecorder;
@@ -49,6 +48,7 @@ import org.apache.qpid.server.model.Syst
 import org.apache.qpid.server.model.SystemContextImpl;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
+import org.apache.qpid.server.store.GenericRecoverer;
 import org.apache.qpid.server.store.UnresolvedConfiguredObject;
 
 public class BrokerRecovererTest extends TestCase
@@ -56,9 +56,9 @@ public class BrokerRecovererTest extends
     private ConfiguredObjectRecord _brokerEntry = 
mock(ConfiguredObjectRecord.class);
 
     private UUID _brokerId = UUID.randomUUID();
-    private AuthenticationProvider _authenticationProvider1;
+    private AuthenticationProvider<?> _authenticationProvider1;
     private UUID _authenticationProvider1Id = UUID.randomUUID();
-    private SystemContext _systemContext;
+    private SystemContext<?> _systemContext;
     private ConfiguredObjectFactory _configuredObjectFactory;
     private TaskExecutor _taskExecutor;
 
@@ -91,8 +91,14 @@ public class BrokerRecovererTest extends
     @Override
     protected void tearDown() throws Exception
     {
-        super.tearDown();
-        _taskExecutor.stop();
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            _taskExecutor.stop();
+        }
     }
 
     public void testCreateBrokerAttributes()
@@ -115,8 +121,8 @@ public class BrokerRecovererTest extends
 
         when(_brokerEntry.getAttributes()).thenReturn(entryAttributes);
 
-        _systemContext.resolveObjects(_brokerEntry);
-        Broker broker = _systemContext.getBroker();
+        resolveObjects(_brokerEntry);
+        Broker<?> broker = _systemContext.getBroker();
 
         assertNotNull(broker);
 
@@ -171,7 +177,7 @@ public class BrokerRecovererTest extends
         UUID authProviderId = UUID.randomUUID();
         UUID portId = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry, 
createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
+        resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, 
"authProvider"), createPortRecord(
                 portId,
                 5672,
                 "authProvider"));
@@ -188,7 +194,7 @@ public class BrokerRecovererTest extends
     {
         UUID authProviderId = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry, 
createAuthProviderRecord(authProviderId, "authProvider"));
+        resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, 
"authProvider"));
         Broker<?> broker = _systemContext.getBroker();
 
 
@@ -206,7 +212,7 @@ public class BrokerRecovererTest extends
         UUID authProvider2Id = UUID.randomUUID();
         UUID port2Id = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry,
+        resolveObjects(_brokerEntry,
                                       createAuthProviderRecord(authProviderId, 
"authProvider"),
                                       createPortRecord(portId, 5672, 
"authProvider"),
                                       
createAuthProviderRecord(authProvider2Id, "authProvider2"),
@@ -228,7 +234,7 @@ public class BrokerRecovererTest extends
 
         UUID authProviderId = UUID.randomUUID();
 
-        _systemContext.resolveObjects(_brokerEntry, 
createGroupProviderRecord(authProviderId, "groupProvider"));
+        resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, 
"groupProvider"));
         Broker<?> broker = _systemContext.getBroker();
 
 
@@ -253,7 +259,7 @@ public class BrokerRecovererTest extends
 
             try
             {
-                _systemContext.resolveObjects(_brokerEntry);
+                resolveObjects(_brokerEntry);
                 Broker<?> broker = _systemContext.getBroker();
                 broker.open();
                 fail("The broker creation should fail due to unsupported model 
version");
@@ -323,33 +329,10 @@ public class BrokerRecovererTest extends
         return String.valueOf(attributeValue);
     }
 
-    private  RecovererProvider createRecoveryProvider(final 
ConfiguredObjectRecord[] entries, final ConfiguredObject[] objectsToRecoverer)
+    private void resolveObjects(ConfiguredObjectRecord... records)
     {
-        RecovererProvider recovererProvider = new RecovererProvider()
-        {
-            @Override
-            public ConfiguredObjectRecoverer<? extends ConfiguredObject> 
getRecoverer(String type)
-            {
-                @SuppressWarnings({ "unchecked", "rawtypes" })
-                final ConfiguredObjectRecoverer<?  extends ConfiguredObject> 
recoverer = new ConfiguredObjectRecoverer()
-                {
-                    public ConfiguredObject create(RecovererProvider 
recovererProvider, ConfiguredObjectRecord entry, ConfiguredObject... parents)
-                    {
-                        for (int i = 0; i < entries.length; i++)
-                        {
-                            ConfiguredObjectRecord e = entries[i];
-                            if (entry == e)
-                            {
-                                return objectsToRecoverer[i];
-                            }
-                        }
-                        return null;
-                    }
-                };
-
-                return recoverer;
-            }
-        };
-        return recovererProvider;
+        GenericRecoverer recoverer = new GenericRecoverer(_systemContext, 
Broker.class.getSimpleName());
+        recoverer.recover(Arrays.asList(records));
     }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to