Author: orudyy
Date: Mon Oct 27 22:32:29 2014
New Revision: 1634713

URL: http://svn.apache.org/r1634713
Log:
QPID-5650: Preserve alternate exchange on upgrade of queue with 'dead letter 
queue'

Added:
    
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Modified:
    
qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java

Modified: 
qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1634713&r1=1634712&r2=1634713&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
 Mon Oct 27 22:32:29 2014
@@ -360,9 +360,6 @@ public class BDBUpgradeTest extends Qpid
     }
 
     /**
-     *
-     * TODO (QPID-5650) Resolve so this test can be reenabled.
-     *
      * Test that the queue configured to have a DLQ was recovered and has the 
alternate exchange
      * and max delivery count, the DLE exists, the DLQ exists with no max 
delivery count, the
      * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
@@ -370,7 +367,7 @@ public class BDBUpgradeTest extends Qpid
      * DLQs are NOT enabled at the virtualhost level, we are testing recovery 
of the arguments
      * that turned it on for this specific queue.
      */
-    public void xtestRecoveryOfQueueWithDLQ() throws Exception
+    public void testRecoveryOfQueueWithDLQ() throws Exception
     {
         JMXTestUtils jmxUtils = null;
         try

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1634713&r1=1634712&r2=1634713&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 Mon Oct 27 22:32:29 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
@@ -36,6 +39,7 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class VirtualHostStoreUpgraderAndRecoverer
 {
@@ -346,6 +350,11 @@ public class VirtualHostStoreUpgraderAnd
 
     private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
     {
+        private static final String ARGUMENTS = "arguments";
+        private static final String DLQ_ENABLED_ARGUMENT = 
"x-qpid-dlq-enabled";
+        private static final String ALTERNATE_EXCHANGE = "alternateExchange";
+        private static final String VIRTUAL_HOST_DLQ_ENABLED = 
"queue.deadLetterQueueEnabled";
+
         private Map<String, String> _missingAmqpExchanges = new 
HashMap<String, String>(DEFAULT_EXCHANGES);
         private ConfiguredObjectRecord _virtualHostRecord;
 
@@ -372,12 +381,24 @@ public class VirtualHostStoreUpgraderAnd
                 String name = (String)attributes.get("name");
                 _missingAmqpExchanges.remove(name);
             }
-            getNextUpgrader().configuredObject(record);
+            getUpdateMap().put(record.getId(), record);
         }
 
         @Override
         public void complete()
         {
+            boolean virtualHostDLQEnabled =  
Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED)));
+            for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = 
getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+            {
+                Map.Entry<UUID, ConfiguredObjectRecord> entry = 
iterator.next();
+                ConfiguredObjectRecord record = entry.getValue();
+                if ("Queue".equals(record.getType()))
+                {
+                    record = upgradeQueueRecordIfNecessary(record, 
virtualHostDLQEnabled);
+                }
+                getNextUpgrader().configuredObject(record);
+            }
+
             for (Entry<String, String> entry : 
_missingAmqpExchanges.entrySet())
             {
                 String name = entry.getKey();
@@ -399,6 +420,54 @@ public class VirtualHostStoreUpgraderAnd
             getNextUpgrader().complete();
         }
 
+        private ConfiguredObjectRecord 
upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean 
_virtualHostDLQEnabled)
+        {
+            Map<String, Object> attributes = new 
LinkedHashMap<>(record.getAttributes());
+            boolean queueArgumentDQLEnabledSet = false;
+            boolean queueDLQEnabled = false;
+
+            if (attributes.get(ARGUMENTS) instanceof Map)
+            {
+                Map<String,Object> arguments = 
(Map<String,Object>)attributes.get(ARGUMENTS);
+                queueArgumentDQLEnabledSet = 
arguments.containsKey(DLQ_ENABLED_ARGUMENT);
+                queueDLQEnabled = queueArgumentDQLEnabledSet ? 
Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) : 
false;
+            }
+
+            if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || 
(!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) && 
attributes.get("alternateExchange") == null)
+            {
+                Object queueName =  attributes.get("name");
+
+                if (queueName == null || "".equals(queueName))
+                {
+                    throw new IllegalConfigurationException("Queue name is not 
found in queue configuration entry attributes: " + attributes);
+                }
+
+                String dleSuffix = 
System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, 
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
+                ConfiguredObjectRecord alternateExchange = 
findConfiguredObjectRecord("Exchange", queueName + dleSuffix);
+
+                if (alternateExchange != null)
+                {
+                    attributes.put(ALTERNATE_EXCHANGE, 
alternateExchange.getId());
+                    record = new ConfiguredObjectRecordImpl(record.getId(), 
record.getType(), attributes, record.getParents());
+                    getUpdateMap().put(record.getId(), record);
+                }
+            }
+            return record;
+        }
+
+        private ConfiguredObjectRecord findConfiguredObjectRecord(String type, 
String name)
+        {
+            Collection<ConfiguredObjectRecord> records = 
getUpdatedRecords().values();
+            for(ConfiguredObjectRecord record: records)
+            {
+                if (type.equals(record.getType()) && 
name.equals(record.getAttributes().get("name")))
+                {
+                    return record;
+                }
+            }
+            return null;
+        }
+
     }
 
     private class Upgrader_2_0_to_2_1 extends StoreUpgraderPhase

Added: 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1634713&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
 Mon Oct 27 22:32:29 2014
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.server.virtualhostnode.TestVirtualHostNode;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
+{
+    private ConfiguredObjectRecord _hostRecord;
+    private CurrentThreadTaskExecutor _taskExecutor;
+    private UUID _hostId;
+    private VirtualHostNode _virtualHostNode;
+    private DurableConfigurationStore _durableConfigurationStore;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        UUID hostParentId = UUID.randomUUID();
+        _hostId = UUID.randomUUID();
+        Map<String, Object> hostAttributes = new HashMap<>();
+        hostAttributes.put("modelVersion", "0.0");
+        hostAttributes.put("name", "test");
+        hostAttributes.put("type", TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+
+        _hostRecord = mock(ConfiguredObjectRecord.class);
+        when(_hostRecord.getId()).thenReturn(_hostId);
+        when(_hostRecord.getAttributes()).thenReturn(hostAttributes);
+        when(_hostRecord.getType()).thenReturn("VirtualHost");
+        when(_hostRecord.toString()).thenReturn("VirtualHost[name='test',id='" 
+ _hostId + "']");
+
+        _taskExecutor = new CurrentThreadTaskExecutor();
+        _taskExecutor.start();
+
+        SystemConfig<?> systemConfig = mock(SystemConfig.class);
+        when(systemConfig.getEventLogger()).thenReturn(new EventLogger());
+
+        Broker<?> broker = mock(Broker.class);
+        when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
+        when(broker.getTaskExecutor()).thenReturn(_taskExecutor);
+        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
+
+        _durableConfigurationStore = mock(DurableConfigurationStore.class);
+        Map<String,Object> attributes = new HashMap<>();
+        attributes.put(VirtualHostNode.ID, hostParentId);
+        attributes.put(VirtualHostNode.NAME, "test");
+        _virtualHostNode = new TestVirtualHostNode(broker, attributes, 
_durableConfigurationStore);
+    }
+
+    @Override
+    public void tearDown()throws Exception
+    {
+        super.tearDown();
+        _taskExecutor.stopImmediately();
+    }
+
+    public void testRecoverQueueWithDLQEnabled() throws Exception
+    {
+        ConfiguredObjectRecord queue = mockQueue("test", 
Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "true"));
+        ConfiguredObjectRecord dlq = mockQueue("test_DLQ", 
Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
+        ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
+        ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
+        ConfiguredObjectRecord directExchange = 
mock(ConfiguredObjectRecord.class);
+        
when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct",
 "test"));
+        ConfiguredObjectRecord queueBinding =  mockBinding("test", queue, 
directExchange);
+        setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
+
+        VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new 
VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+        upgraderAndRecoverer.perform(_durableConfigurationStore);
+
+        VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
+        host.open();
+
+        assertNotNull("Virtual host is not recovered", host);
+        Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, 
"test");
+        assertNotNull("Queue is not recovered", recoveredQueue);
+
+        Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, 
"test_DLQ");
+        assertNotNull("DLQ queue is not recovered", recoveredDLQ);
+
+        Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, 
"test_DLE");
+        assertNotNull("DLE exchange is not recovered", recoveredDLE);
+
+        assertEquals("Unexpected alternative exchange", recoveredDLE, 
recoveredQueue.getAlternateExchange());
+    }
+
+    public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception
+    {
+        
_hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, 
"true");
+
+        ConfiguredObjectRecord queue = mockQueue("test", null);
+        ConfiguredObjectRecord dlq = mockQueue("test_DLQ", 
Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
+        ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
+        ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
+        ConfiguredObjectRecord directExchange = 
mock(ConfiguredObjectRecord.class);
+        
when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct",
 "test"));
+        ConfiguredObjectRecord queueBinding =  mockBinding("test", queue, 
directExchange);
+        setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
+
+        VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new 
VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+        upgraderAndRecoverer.perform(_durableConfigurationStore);
+
+        VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
+        host.open();
+
+        assertNotNull("Virtual host is not recovered", host);
+        Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, 
"test");
+        assertNotNull("Queue is not recovered", recoveredQueue);
+
+        Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, 
"test_DLQ");
+        assertNotNull("DLQ queue is not recovered", recoveredDLQ);
+
+        Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, 
"test_DLE");
+        assertNotNull("DLE exchange is not recovered", recoveredDLE);
+
+        assertEquals("Unexpected alternative exchange", recoveredDLE, 
recoveredQueue.getAlternateExchange());
+    }
+
+    private ConfiguredObjectRecord mockBinding(String bindingName, 
ConfiguredObjectRecord queue, ConfiguredObjectRecord exchange)
+    {
+        ConfiguredObjectRecord binding = mock(ConfiguredObjectRecord.class);
+        when(binding.getId()).thenReturn(UUID.randomUUID());
+        
when(binding.getType()).thenReturn("org.apache.qpid.server.model.Binding");
+        Map<String,UUID> parents = new HashMap<>();
+        parents.put("Queue", queue.getId());
+        parents.put("Exchange", exchange.getId());
+        when(binding.getParents()).thenReturn(parents);
+        when(binding.toString()).thenReturn("Binding[" + bindingName + "]");
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("durable", true);
+        attributes.put("name", bindingName);
+        when(binding.getAttributes()).thenReturn(attributes);
+        return binding;
+    }
+
+    private ConfiguredObjectRecord mockExchange(String exchangeName, String 
exchangeType)
+    {
+        ConfiguredObjectRecord exchange = mock(ConfiguredObjectRecord.class);
+        when(exchange.getId()).thenReturn(UUID.randomUUID());
+        
when(exchange.getType()).thenReturn("org.apache.qpid.server.model.Exchange");
+        
when(exchange.getParents()).thenReturn(Collections.singletonMap("VirtualHost", 
_hostId));
+        when(exchange.toString()).thenReturn("Exchange[" + exchangeName + "]");
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("type", exchangeType);
+        attributes.put("durable", true);
+        attributes.put("name", exchangeName);
+        when(exchange.getAttributes()).thenReturn(attributes);
+        return exchange;
+    }
+
+    private ConfiguredObjectRecord mockQueue(String queueName, Map<String, 
Object> arguments)
+    {
+        ConfiguredObjectRecord queue = mock(ConfiguredObjectRecord.class);
+        when(queue.getId()).thenReturn(UUID.randomUUID());
+        when(queue.getType()).thenReturn("org.apache.qpid.server.model.Queue");
+        
when(queue.getParents()).thenReturn(Collections.singletonMap("VirtualHost", 
_hostId));
+        when(queue.toString()).thenReturn("Queue[" + queueName + "]");
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("durable", true);
+        attributes.put("name", queueName);
+        if (arguments != null)
+        {
+            attributes.put("arguments", arguments);
+        }
+        when(queue.getAttributes()).thenReturn(attributes);
+        return queue;
+    }
+
+
+    private void setUpVisit(final ConfiguredObjectRecord... records)
+    {
+        doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                Iterator<ConfiguredObjectRecord> iterator = 
asList(records).iterator();
+                ConfiguredObjectRecordHandler handler = 
(ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+                handler.begin();
+                boolean handlerContinue = true;
+                while(iterator.hasNext() && handlerContinue)
+                {
+                    handlerContinue = handler.handle(iterator.next());
+                }
+                handler.end();
+                return null;
+            }
+        
}).when(_durableConfigurationStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class));
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to