Author: orudyy
Date: Tue Oct 28 19:37:46 2014
New Revision: 1634957

URL: http://svn.apache.org/r1634957
Log:
QPID-5650: Set alternate exchange only when queue creation argument 
'x-qpid-dlq-enabled' is set

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
    
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1634957&r1=1634956&r2=1634957&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 Tue Oct 28 19:37:46 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -39,6 +38,7 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class VirtualHostStoreUpgraderAndRecoverer
@@ -350,14 +350,14 @@ public class VirtualHostStoreUpgraderAnd
 
     private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
     {
-        private static final String ARGUMENTS = "arguments";
-        private static final String DLQ_ENABLED_ARGUMENT = 
"x-qpid-dlq-enabled";
         private static final String ALTERNATE_EXCHANGE = "alternateExchange";
-        private static final String VIRTUAL_HOST_DLQ_ENABLED = 
"queue.deadLetterQueueEnabled";
 
         private Map<String, String> _missingAmqpExchanges = new 
HashMap<String, String>(DEFAULT_EXCHANGES);
         private ConfiguredObjectRecord _virtualHostRecord;
 
+        private Map<UUID, String> _queuesMissingAlternateExchange = new 
HashMap<>();
+        private Map<String, ConfiguredObjectRecord> _exchanges = new 
HashMap<>();
+
         public Upgrader_0_4_to_2_0()
         {
             super("modelVersion", "0.4", "2.0");
@@ -380,23 +380,30 @@ public class VirtualHostStoreUpgraderAnd
                 Map<String, Object> attributes = record.getAttributes();
                 String name = (String)attributes.get("name");
                 _missingAmqpExchanges.remove(name);
+                _exchanges.put(name, record);
             }
-            getUpdateMap().put(record.getId(), record);
+            else if("Queue".equals(record.getType()))
+            {
+                record = updateQueueRecordIfNecessary(record);
+            }
+            getNextUpgrader().configuredObject(record);
         }
 
         @Override
         public void complete()
         {
-            boolean virtualHostDLQEnabled =  
Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED)));
-            for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = 
getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+            for (UUID queueId : _queuesMissingAlternateExchange.keySet())
             {
-                Map.Entry<UUID, ConfiguredObjectRecord> entry = 
iterator.next();
-                ConfiguredObjectRecord record = entry.getValue();
-                if ("Queue".equals(record.getType()))
+                ConfiguredObjectRecord record = getUpdateMap().get(queueId);
+                if (record != null)
                 {
-                    record = upgradeQueueRecordIfNecessary(record, 
virtualHostDLQEnabled);
+                    String dleExchangeName = 
_queuesMissingAlternateExchange.get(queueId);
+                    ConfiguredObjectRecord alternateExchange = 
_exchanges.get(dleExchangeName);
+                    if (alternateExchange != null)
+                    {
+                        setAlternateExchangeAttribute(record, 
alternateExchange);
+                    }
                 }
-                getNextUpgrader().configuredObject(record);
             }
 
             for (Entry<String, String> entry : 
_missingAmqpExchanges.entrySet())
@@ -420,45 +427,51 @@ public class VirtualHostStoreUpgraderAnd
             getNextUpgrader().complete();
         }
 
-        private ConfiguredObjectRecord 
upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean 
_virtualHostDLQEnabled)
+        private ConfiguredObjectRecord 
updateQueueRecordIfNecessary(ConfiguredObjectRecord record)
         {
-            Map<String, Object> attributes = new 
LinkedHashMap<>(record.getAttributes());
-            boolean queueArgumentDQLEnabledSet = false;
-            boolean queueDLQEnabled = false;
-
-            if (attributes.get(ARGUMENTS) instanceof Map)
-            {
-                Map<String,Object> arguments = 
(Map<String,Object>)attributes.get(ARGUMENTS);
-                queueArgumentDQLEnabledSet = 
arguments.containsKey(DLQ_ENABLED_ARGUMENT);
-                queueDLQEnabled = queueArgumentDQLEnabledSet ? 
Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) : 
false;
-            }
-
-            if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || 
(!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) && 
attributes.get("alternateExchange") == null)
+            Map<String, Object> attributes = record.getAttributes();
+            boolean queueDLQEnabled = 
Boolean.parseBoolean(String.valueOf(attributes.get(AbstractVirtualHost.CREATE_DLQ_ON_CREATION)));
+            if(queueDLQEnabled && attributes.get(ALTERNATE_EXCHANGE) == null)
             {
                 Object queueName =  attributes.get("name");
-
                 if (queueName == null || "".equals(queueName))
                 {
                     throw new IllegalConfigurationException("Queue name is not 
found in queue configuration entry attributes: " + attributes);
                 }
 
                 String dleSuffix = 
System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, 
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
-                ConfiguredObjectRecord alternateExchange = 
findConfiguredObjectRecord("Exchange", queueName + dleSuffix);
+                String dleExchangeName = queueName + dleSuffix;
 
-                if (alternateExchange != null)
+                ConfiguredObjectRecord exchangeRecord = 
findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName);
+                if (exchangeRecord == null)
                 {
-                    attributes.put(ALTERNATE_EXCHANGE, 
alternateExchange.getId());
-                    record = new ConfiguredObjectRecordImpl(record.getId(), 
record.getType(), attributes, record.getParents());
-                    getUpdateMap().put(record.getId(), record);
+                    // add record to update Map if it is not there
+                    if (!getUpdateMap().containsKey(record.getId()))
+                    {
+                        getUpdateMap().put(record.getId(), record);
+                    }
+                    _queuesMissingAlternateExchange.put(record.getId(), 
dleExchangeName);
+                }
+                else
+                {
+                    record = setAlternateExchangeAttribute(record, 
exchangeRecord);
                 }
             }
             return record;
         }
 
-        private ConfiguredObjectRecord findConfiguredObjectRecord(String type, 
String name)
+        private ConfiguredObjectRecord 
setAlternateExchangeAttribute(ConfiguredObjectRecord record, 
ConfiguredObjectRecord alternateExchange)
+        {
+            Map<String, Object> attributes = new 
LinkedHashMap<>(record.getAttributes());
+            attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
+            record = new ConfiguredObjectRecordImpl(record.getId(), 
record.getType(), attributes, record.getParents());
+            getUpdateMap().put(record.getId(), record);
+            return record;
+        }
+
+        private ConfiguredObjectRecord 
findConfiguredObjectRecordInUpdateMap(String type, String name)
         {
-            Collection<ConfiguredObjectRecord> records = 
getUpdatedRecords().values();
-            for(ConfiguredObjectRecord record: records)
+            for(ConfiguredObjectRecord record: getUpdateMap().values())
             {
                 if (type.equals(record.getType()) && 
name.equals(record.getAttributes().get("name")))
                 {

Modified: 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1634957&r1=1634956&r2=1634957&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
 Tue Oct 28 19:37:46 2014
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doAnsw
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.security.PrivilegedAction;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -49,6 +50,8 @@ import org.apache.qpid.test.utils.QpidTe
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.security.auth.Subject;
+
 public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
 {
     private ConfiguredObjectRecord _hostRecord;
@@ -114,40 +117,17 @@ public class VirtualHostStoreUpgraderAnd
         VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new 
VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
         upgraderAndRecoverer.perform(_durableConfigurationStore);
 
-        VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
-        host.open();
-
-        assertNotNull("Virtual host is not recovered", host);
-        Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, 
"test");
-        assertNotNull("Queue is not recovered", recoveredQueue);
-
-        Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, 
"test_DLQ");
-        assertNotNull("DLQ queue is not recovered", recoveredDLQ);
-
-        Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, 
"test_DLE");
-        assertNotNull("DLE exchange is not recovered", recoveredDLE);
-
-        assertEquals("Unexpected alternative exchange", recoveredDLE, 
recoveredQueue.getAlternateExchange());
-    }
-
-    public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception
-    {
-        
_hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, 
"true");
-
-        ConfiguredObjectRecord queue = mockQueue("test", null);
-        ConfiguredObjectRecord dlq = mockQueue("test_DLQ", 
Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
-        ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
-        ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
-        ConfiguredObjectRecord directExchange = 
mock(ConfiguredObjectRecord.class);
-        
when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct",
 "test"));
-        ConfiguredObjectRecord queueBinding =  mockBinding("test", queue, 
directExchange);
-        setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
-
-        VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new 
VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
-        upgraderAndRecoverer.perform(_durableConfigurationStore);
-
-        VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
-        host.open();
+        final VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
+        
Subject.doAs(org.apache.qpid.server.security.SecurityManager.getSubjectWithAddedSystemRights(),
 new PrivilegedAction<Void>()
+                {
+                    @Override
+                    public Void run()
+                    {
+                        host.open();
+                        return null;
+                    }
+                }
+        );
 
         assertNotNull("Virtual host is not recovered", host);
         Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, 
"test");



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to