Author: orudyy
Date: Tue Oct 28 19:37:46 2014
New Revision: 1634957
URL: http://svn.apache.org/r1634957
Log:
QPID-5650: Set alternate exchange only when queue creation argument
'x-qpid-dlq-enabled' is set
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1634957&r1=1634956&r2=1634957&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
Tue Oct 28 19:37:46 2014
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -39,6 +38,7 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class VirtualHostStoreUpgraderAndRecoverer
@@ -350,14 +350,14 @@ public class VirtualHostStoreUpgraderAnd
private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
{
- private static final String ARGUMENTS = "arguments";
- private static final String DLQ_ENABLED_ARGUMENT =
"x-qpid-dlq-enabled";
private static final String ALTERNATE_EXCHANGE = "alternateExchange";
- private static final String VIRTUAL_HOST_DLQ_ENABLED =
"queue.deadLetterQueueEnabled";
private Map<String, String> _missingAmqpExchanges = new
HashMap<String, String>(DEFAULT_EXCHANGES);
private ConfiguredObjectRecord _virtualHostRecord;
+ private Map<UUID, String> _queuesMissingAlternateExchange = new
HashMap<>();
+ private Map<String, ConfiguredObjectRecord> _exchanges = new
HashMap<>();
+
public Upgrader_0_4_to_2_0()
{
super("modelVersion", "0.4", "2.0");
@@ -380,23 +380,30 @@ public class VirtualHostStoreUpgraderAnd
Map<String, Object> attributes = record.getAttributes();
String name = (String)attributes.get("name");
_missingAmqpExchanges.remove(name);
+ _exchanges.put(name, record);
}
- getUpdateMap().put(record.getId(), record);
+ else if("Queue".equals(record.getType()))
+ {
+ record = updateQueueRecordIfNecessary(record);
+ }
+ getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
- boolean virtualHostDLQEnabled =
Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED)));
- for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator =
getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+ for (UUID queueId : _queuesMissingAlternateExchange.keySet())
{
- Map.Entry<UUID, ConfiguredObjectRecord> entry =
iterator.next();
- ConfiguredObjectRecord record = entry.getValue();
- if ("Queue".equals(record.getType()))
+ ConfiguredObjectRecord record = getUpdateMap().get(queueId);
+ if (record != null)
{
- record = upgradeQueueRecordIfNecessary(record,
virtualHostDLQEnabled);
+ String dleExchangeName =
_queuesMissingAlternateExchange.get(queueId);
+ ConfiguredObjectRecord alternateExchange =
_exchanges.get(dleExchangeName);
+ if (alternateExchange != null)
+ {
+ setAlternateExchangeAttribute(record,
alternateExchange);
+ }
}
- getNextUpgrader().configuredObject(record);
}
for (Entry<String, String> entry :
_missingAmqpExchanges.entrySet())
@@ -420,45 +427,51 @@ public class VirtualHostStoreUpgraderAnd
getNextUpgrader().complete();
}
- private ConfiguredObjectRecord
upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean
_virtualHostDLQEnabled)
+ private ConfiguredObjectRecord
updateQueueRecordIfNecessary(ConfiguredObjectRecord record)
{
- Map<String, Object> attributes = new
LinkedHashMap<>(record.getAttributes());
- boolean queueArgumentDQLEnabledSet = false;
- boolean queueDLQEnabled = false;
-
- if (attributes.get(ARGUMENTS) instanceof Map)
- {
- Map<String,Object> arguments =
(Map<String,Object>)attributes.get(ARGUMENTS);
- queueArgumentDQLEnabledSet =
arguments.containsKey(DLQ_ENABLED_ARGUMENT);
- queueDLQEnabled = queueArgumentDQLEnabledSet ?
Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) :
false;
- }
-
- if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) ||
(!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) &&
attributes.get("alternateExchange") == null)
+ Map<String, Object> attributes = record.getAttributes();
+ boolean queueDLQEnabled =
Boolean.parseBoolean(String.valueOf(attributes.get(AbstractVirtualHost.CREATE_DLQ_ON_CREATION)));
+ if(queueDLQEnabled && attributes.get(ALTERNATE_EXCHANGE) == null)
{
Object queueName = attributes.get("name");
-
if (queueName == null || "".equals(queueName))
{
throw new IllegalConfigurationException("Queue name is not
found in queue configuration entry attributes: " + attributes);
}
String dleSuffix =
System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX,
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
- ConfiguredObjectRecord alternateExchange =
findConfiguredObjectRecord("Exchange", queueName + dleSuffix);
+ String dleExchangeName = queueName + dleSuffix;
- if (alternateExchange != null)
+ ConfiguredObjectRecord exchangeRecord =
findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName);
+ if (exchangeRecord == null)
{
- attributes.put(ALTERNATE_EXCHANGE,
alternateExchange.getId());
- record = new ConfiguredObjectRecordImpl(record.getId(),
record.getType(), attributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
+ // add record to update Map if it is not there
+ if (!getUpdateMap().containsKey(record.getId()))
+ {
+ getUpdateMap().put(record.getId(), record);
+ }
+ _queuesMissingAlternateExchange.put(record.getId(),
dleExchangeName);
+ }
+ else
+ {
+ record = setAlternateExchangeAttribute(record,
exchangeRecord);
}
}
return record;
}
- private ConfiguredObjectRecord findConfiguredObjectRecord(String type,
String name)
+ private ConfiguredObjectRecord
setAlternateExchangeAttribute(ConfiguredObjectRecord record,
ConfiguredObjectRecord alternateExchange)
+ {
+ Map<String, Object> attributes = new
LinkedHashMap<>(record.getAttributes());
+ attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
+ record = new ConfiguredObjectRecordImpl(record.getId(),
record.getType(), attributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ return record;
+ }
+
+ private ConfiguredObjectRecord
findConfiguredObjectRecordInUpdateMap(String type, String name)
{
- Collection<ConfiguredObjectRecord> records =
getUpdatedRecords().values();
- for(ConfiguredObjectRecord record: records)
+ for(ConfiguredObjectRecord record: getUpdateMap().values())
{
if (type.equals(record.getType()) &&
name.equals(record.getAttributes().get("name")))
{
Modified:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1634957&r1=1634956&r2=1634957&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Tue Oct 28 19:37:46 2014
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doAnsw
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -49,6 +50,8 @@ import org.apache.qpid.test.utils.QpidTe
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.security.auth.Subject;
+
public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
{
private ConfiguredObjectRecord _hostRecord;
@@ -114,40 +117,17 @@ public class VirtualHostStoreUpgraderAnd
VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new
VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
upgraderAndRecoverer.perform(_durableConfigurationStore);
- VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
- host.open();
-
- assertNotNull("Virtual host is not recovered", host);
- Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class,
"test");
- assertNotNull("Queue is not recovered", recoveredQueue);
-
- Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class,
"test_DLQ");
- assertNotNull("DLQ queue is not recovered", recoveredDLQ);
-
- Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class,
"test_DLE");
- assertNotNull("DLE exchange is not recovered", recoveredDLE);
-
- assertEquals("Unexpected alternative exchange", recoveredDLE,
recoveredQueue.getAlternateExchange());
- }
-
- public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception
- {
-
_hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED,
"true");
-
- ConfiguredObjectRecord queue = mockQueue("test", null);
- ConfiguredObjectRecord dlq = mockQueue("test_DLQ",
Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
- ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
- ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
- ConfiguredObjectRecord directExchange =
mock(ConfiguredObjectRecord.class);
-
when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct",
"test"));
- ConfiguredObjectRecord queueBinding = mockBinding("test", queue,
directExchange);
- setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
-
- VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new
VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
- upgraderAndRecoverer.perform(_durableConfigurationStore);
-
- VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
- host.open();
+ final VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
+
Subject.doAs(org.apache.qpid.server.security.SecurityManager.getSubjectWithAddedSystemRights(),
new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ host.open();
+ return null;
+ }
+ }
+ );
assertNotNull("Virtual host is not recovered", host);
Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class,
"test");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]