Author: kwall
Date: Fri Jun 22 16:13:52 2012
New Revision: 1352946
URL: http://svn.apache.org/viewvc?rev=1352946&view=rev
Log:
NO-JIRA: Implement ManagedQueue#set/getAlternateExchange in JMX management
layer. Implement persistence of alternate exchange (by the UUID) in the store
layer.
Added:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFinder.java
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaPre010Excludes
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaTransientExcludes
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
Fri Jun 22 16:13:52 2012
@@ -48,6 +48,8 @@ import org.apache.qpid.server.jmx.AMQMan
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
@@ -274,14 +276,25 @@ public class QueueMBean extends AMQManag
_queue.setAttribute(Queue.EXCLUSIVE, isExclusive(), exclusive);
}
- public void setAlternateExchange(String exchangeName)
+ public void setAlternateExchange(String exchangeName) throws
OperationsException
{
- // TODO - implement setAlternateExchange()
+ if (exchangeName == null || "".equals(exchangeName))
+ {
+ _queue.setAttribute(Queue.ALTERNATE_EXCHANGE,
getAlternateExchange(), null);
+ }
+ else
+ {
+ VirtualHost virtualHost = _queue.getParent(VirtualHost.class);
+ Exchange exchange = findExchangeFromExchangeName(virtualHost,
exchangeName);
+
+ _queue.setAttribute(Queue.ALTERNATE_EXCHANGE,
getAlternateExchange(), exchange);
+ }
}
public String getAlternateExchange()
{
- return null; // TODO - implement getAlternateExchange()
+ Exchange alternateExchange = (Exchange)
_queue.getAttribute(Queue.ALTERNATE_EXCHANGE);
+ return alternateExchange == null ? null : alternateExchange.getName();
}
public TabularData viewMessages(int fromIndex, int toIndex)
@@ -464,22 +477,7 @@ public class QueueMBean extends AMQManag
}
VirtualHost vhost = _queue.getParent(VirtualHost.class);
- Queue destinationQueue = null;
- for(Queue q : vhost.getQueues())
- {
- if(q.getName().equals(toQueue))
- {
- destinationQueue = q;
- break;
- }
- }
- if(destinationQueue == null)
- {
- throw new OperationsException("No such queue \""+toQueue+"\"");
- }
-
- final Queue queue = destinationQueue;
-
+ final Queue destinationQueue = findQueueFromQueueName(vhost, toQueue);
vhost.executeTransaction(new VirtualHost.TransactionalOperation()
{
@@ -498,7 +496,7 @@ public class QueueMBean extends AMQManag
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId))
{
- txn.move(entry, queue);
+ txn.move(entry, destinationQueue);
}
}
@@ -551,22 +549,7 @@ public class QueueMBean extends AMQManag
}
VirtualHost vhost = _queue.getParent(VirtualHost.class);
- Queue destinationQueue = null;
- for(Queue q : vhost.getQueues())
- {
- if(q.getName().equals(toQueue))
- {
- destinationQueue = q;
- break;
- }
- }
- if(destinationQueue == null)
- {
- throw new OperationsException("No such queue \""+toQueue+"\"");
- }
-
- final Queue queue = destinationQueue;
-
+ final Queue destinationQueue = findQueueFromQueueName(vhost, toQueue);
vhost.executeTransaction(new VirtualHost.TransactionalOperation()
{
@@ -585,7 +568,7 @@ public class QueueMBean extends AMQManag
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId))
{
- txn.copy(entry, queue);
+ txn.copy(entry, destinationQueue);
}
}
@@ -680,6 +663,30 @@ public class QueueMBean extends AMQManag
{
_queue.setAttribute(Queue.DESCRIPTION, getDescription(), description);
}
-}
+ private Queue findQueueFromQueueName(VirtualHost virtualHost, String
queueName) throws OperationsException
+ {
+ Queue queue =
ConfiguredObjectFinder.findConfiguredObjectByName(virtualHost.getQueues(),
queueName);
+ if (queue == null)
+ {
+ throw new OperationsException("No such queue \""+queueName+"\"");
+ }
+ else
+ {
+ return queue;
+ }
+ }
+ private Exchange findExchangeFromExchangeName(VirtualHost virtualHost,
String exchangeName) throws OperationsException
+ {
+ Exchange exchange =
ConfiguredObjectFinder.findConfiguredObjectByName(virtualHost.getExchanges(),
exchangeName);
+ if (exchange == null)
+ {
+ throw new OperationsException("No such exchange
\""+exchangeName+"\"");
+ }
+ else
+ {
+ return exchange;
+ }
+ }
+}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
Fri Jun 22 16:13:52 2012
@@ -20,9 +20,17 @@ package org.apache.qpid.server.jmx.mbean
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import javax.management.OperationsException;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
import junit.framework.TestCase;
@@ -30,6 +38,8 @@ public class QueueMBeanTest extends Test
{
private static final String QUEUE_NAME = "QUEUE_NAME";
private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION";
+ private static final String QUEUE_TYPE = "QUEUE_TYPE";
+ private static final String QUEUE_ALTERNATE_EXCHANGE =
"QUEUE_ALTERNATE_EXCHANGE";
private Queue _mockQueue;
private VirtualHostMBean _mockVirtualHostMBean;
@@ -55,10 +65,85 @@ public class QueueMBeanTest extends Test
assertEquals(QUEUE_NAME, _queueMBean.getName());
}
- public void testQueueDescription()
+ public void testGetQueueDescription()
{
when(_mockQueue.getAttribute(Queue.DESCRIPTION)).thenReturn(QUEUE_DESCRIPTION);
assertEquals(QUEUE_DESCRIPTION, _queueMBean.getDescription());
}
+
+ public void testSetQueueDescription()
+ {
+ _queueMBean.setDescription(QUEUE_DESCRIPTION);
+ verify(_mockQueue).setAttribute(Queue.DESCRIPTION, null,
QUEUE_DESCRIPTION);
+ }
+
+ public void testQueueType()
+ {
+ when(_mockQueue.getAttribute(Queue.TYPE)).thenReturn(QUEUE_TYPE);
+
+ assertEquals(QUEUE_TYPE, _queueMBean.getQueueType());
+ }
+
+ public void testGetAlternateExchange()
+ {
+ Exchange mockAlternateExchange = mock(Exchange.class);
+
when(mockAlternateExchange.getName()).thenReturn(QUEUE_ALTERNATE_EXCHANGE);
+
+
when(_mockQueue.getAttribute(Queue.ALTERNATE_EXCHANGE)).thenReturn(mockAlternateExchange);
+
+ assertEquals(QUEUE_ALTERNATE_EXCHANGE,
_queueMBean.getAlternateExchange());
+ }
+
+ public void testGetAlternateExchangeWhenQueueHasNone()
+ {
+
when(_mockQueue.getAttribute(Queue.ALTERNATE_EXCHANGE)).thenReturn(null);
+
+ assertNull(_queueMBean.getAlternateExchange());
+ }
+
+ public void testSetAlternateExchange() throws Exception
+ {
+ Exchange mockExchange1 = mock(Exchange.class);
+ when(mockExchange1.getName()).thenReturn("exchange1");
+
+ Exchange mockExchange2 = mock(Exchange.class);
+ when(mockExchange2.getName()).thenReturn("exchange2");
+
+ Exchange mockExchange3 = mock(Exchange.class);
+ when(mockExchange3.getName()).thenReturn("exchange3");
+
+ VirtualHost mockVirtualHost = mock(VirtualHost.class);
+ when(mockVirtualHost.getExchanges()).thenReturn(Arrays.asList(new
Exchange[] {mockExchange1, mockExchange2, mockExchange3}));
+
when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost);
+
+ _queueMBean.setAlternateExchange("exchange2");
+ verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null,
mockExchange2);
+ }
+
+ public void testSetAlternateExchangeWithUnknownExchangeName() throws
Exception
+ {
+ Exchange mockExchange = mock(Exchange.class);
+ when(mockExchange.getName()).thenReturn("exchange1");
+
+ VirtualHost mockVirtualHost = mock(VirtualHost.class);
+
when(mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange));
+
when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost);
+
+ try
+ {
+ _queueMBean.setAlternateExchange("notknown");
+ fail("Exception not thrown");
+ }
+ catch(OperationsException oe)
+ {
+ // PASS
+ }
+ }
+
+ public void testRemoveAlternateExchange() throws Exception
+ {
+ _queueMBean.setAlternateExchange("");
+ verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null);
+ }
}
Added:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFinder.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFinder.java?rev=1352946&view=auto
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFinder.java
(added)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFinder.java
Fri Jun 22 16:13:52 2012
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+
+public class ConfiguredObjectFinder
+{
+ public static <C extends ConfiguredObject> C
findConfiguredObjectByName(Collection<C> configuredObjects, String name)
+ {
+ for (C configuredObject : configuredObjects)
+ {
+ if (name.equals(configuredObject.getName()))
+ {
+ return configuredObject;
+ }
+ }
+ return null;
+ }
+}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
Fri Jun 22 16:13:52 2012
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -230,7 +231,10 @@ final class QueueAdapter extends Abstrac
}
else if(ALTERNATE_EXCHANGE.equals(name))
{
- // TODO
+ // In future we may want to accept a UUID as an alternative
way to identifying the exchange
+ ExchangeAdapter alternateExchange = (ExchangeAdapter) desired;
+ _queue.setAlternateExchange(alternateExchange == null ? null :
alternateExchange.getExchange());
+ return desired;
}
else if(EXCLUSIVE.equals(name))
{
@@ -339,7 +343,10 @@ final class QueueAdapter extends Abstrac
}
else if(ALTERNATE_EXCHANGE.equals(name))
{
- return _queue.getAlternateExchange() == null ? null :
_queue.getAlternateExchange().getName();
+ org.apache.qpid.server.exchange.Exchange alternateExchange =
_queue.getAlternateExchange();
+ return alternateExchange == null ? null :
+
ConfiguredObjectFinder.findConfiguredObjectByName(_vhost.getExchanges(),
+
alternateExchange.getName());
}
else if(EXCLUSIVE.equals(name))
{
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Fri Jun 22 16:13:52 2012
@@ -232,8 +232,6 @@ public interface AMQQueue extends Compar
void setAlternateExchange(Exchange exchange);
- void setAlternateExchange(String exchangeName);
-
Map<String, Object> getArguments();
void checkCapacity(AMQSessionModel channel);
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Fri Jun 22 16:13:52 2012
@@ -357,22 +357,6 @@ public class SimpleAMQQueue implements A
_alternateExchange = exchange;
}
- public void setAlternateExchange(String exchangeName)
- {
- if(exchangeName == null || exchangeName.equals(""))
- {
- _alternateExchange = null;
- return;
- }
-
- Exchange exchange =
getVirtualHost().getExchangeRegistry().getExchange(new
AMQShortString(exchangeName));
- if (exchange == null)
- {
- throw new RuntimeException("Exchange '" + exchangeName + "' is not
registered with the VirtualHost.");
- }
- setAlternateExchange(exchange);
- }
-
/**
* Arguments used to create this queue. The caller is assured
* that null will never be returned.
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
Fri Jun 22 16:13:52 2012
@@ -32,7 +32,7 @@ public interface ConfigurationRecoveryHa
public static interface QueueRecoveryHandler
{
- void queue(UUID id, String queueName, String owner, boolean exclusive,
FieldTable arguments);
+ void queue(UUID id, String queueName, String owner, boolean exclusive,
FieldTable arguments, UUID alternateExchangeId);
ExchangeRecoveryHandler completeQueueRecovery();
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
Fri Jun 22 16:13:52 2012
@@ -52,6 +52,7 @@ public class ConfiguredObjectHelper
String queueName = (String) attributeMap.get(Queue.NAME);
String owner = (String) attributeMap.get(Queue.OWNER);
boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+ UUID alternateExchangeId =
attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null :
UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
@SuppressWarnings("unchecked")
Map<String, Object> queueArgumentsMap = (Map<String, Object>)
attributeMap.get(Queue.ARGUMENTS);
FieldTable arguments = null;
@@ -59,7 +60,7 @@ public class ConfiguredObjectHelper
{
arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
}
- qrh.queue(configuredObject.getId(), queueName, owner, exclusive,
arguments);
+ qrh.queue(configuredObject.getId(), queueName, owner, exclusive,
arguments, alternateExchangeId);
}
}
@@ -68,6 +69,14 @@ public class ConfiguredObjectHelper
Map<String, Object> attributesMap =
_serializer.deserialize(queueRecord.getAttributes());
attributesMap.put(Queue.NAME, queue.getName());
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (queue.getAlternateExchange() != null)
+ {
+ attributesMap.put(Queue.ALTERNATE_EXCHANGE,
queue.getAlternateExchange().getId());
+ }
+ else
+ {
+ attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
+ }
if (attributesMap.containsKey(Queue.ARGUMENTS))
{
// We wouldn't need this if createQueueConfiguredObject took only
AMQQueue
@@ -89,6 +98,10 @@ public class ConfiguredObjectHelper
attributesMap.put(Queue.NAME, queue.getName());
attributesMap.put(Queue.OWNER,
AMQShortString.toString(queue.getOwner()));
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (queue.getAlternateExchange() != null)
+ {
+ attributesMap.put(Queue.ALTERNATE_EXCHANGE,
queue.getAlternateExchange().getId());
+ }
// TODO KW i think the arguments could come from the queue itself
removing the need for the parameter arguments.
// It would also do away with the need for the if/then/else within
updateQueueConfiguredObject
if (arguments != null)
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
Fri Jun 22 16:13:52 2012
@@ -100,7 +100,7 @@ public class VirtualHostConfigRecoveryHa
return this;
}
- public void queue(UUID id, String queueName, String owner, boolean
exclusive, FieldTable arguments)
+ public void queue(UUID id, String queueName, String owner, boolean
exclusive, FieldTable arguments, UUID alternateExchangeId)
{
try
{
@@ -111,6 +111,17 @@ public class VirtualHostConfigRecoveryHa
q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true,
owner, false, exclusive, _virtualHost,
FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
+
+ if (alternateExchangeId != null)
+ {
+ Exchange altExchange =
_virtualHost.getExchangeRegistry().getExchange(alternateExchangeId);
+ if (altExchange == null)
+ {
+ _logger.error("Unknown exchange id " +
alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
+ return;
+ }
+ q.setAlternateExchange(altExchange);
+ }
}
CurrentActor.get().message(_logSubject,
TransactionLogMessages.RECOVERY_START(queueName, true));
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Fri Jun 22 16:13:52 2012
@@ -629,10 +629,6 @@ public class MockAMQQueue implements AMQ
{
}
- public void setAlternateExchange(String exchangeName)
- {
- }
-
public void visit(final QueueEntryVisitor visitor)
{
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
Fri Jun 22 16:13:52 2012
@@ -192,7 +192,7 @@ public class DurableConfigurationStoreTe
_store.createQueue(queue);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", true, null);
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", true, null, null);
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -206,10 +206,29 @@ public class DurableConfigurationStoreTe
_store.createQueue(queue, arguments);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", true, arguments);
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", true, arguments, null);
}
- public void testUpdateQueue() throws Exception
+ public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
+ {
+ Exchange alternateExchange = createTestAlternateExchange();
+
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true,
alternateExchange);
+ _store.createQueue(queue);
+
+ reopenStore();
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", true, null, alternateExchange.getId());
+ }
+
+ private Exchange createTestAlternateExchange()
+ {
+ UUID exchUuid = UUID.randomUUID();
+ Exchange alternateExchange = mock(Exchange.class);
+ when(alternateExchange.getId()).thenReturn(exchUuid);
+ return alternateExchange;
+ }
+
+ public void testUpdateQueueExclusivity() throws Exception
{
// create queue
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
@@ -224,7 +243,26 @@ public class DurableConfigurationStoreTe
_store.updateQueue(queue);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", false, arguments);
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", false, arguments, null);
+ }
+
+ public void testUpdateQueueAlternateExchange() throws Exception
+ {
+ // create queue
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
+ attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+ FieldTable arguments = FieldTable.convertToFieldTable(attributes);
+ _store.createQueue(queue, arguments);
+
+ // update the queue to have exclusive=false
+ Exchange alternateExchange = createTestAlternateExchange();
+ queue = createTestQueue(getName(), getName() + "Owner", false,
alternateExchange);
+ _store.updateQueue(queue);
+
+ reopenStore();
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() +
"Owner", false, arguments, alternateExchange.getId());
}
public void testRemoveQueue() throws Exception
@@ -241,17 +279,23 @@ public class DurableConfigurationStoreTe
_store.removeQueue(queue);
reopenStore();
verify(_queueRecoveryHandler, never()).queue(any(UUID.class),
anyString(), anyString(), anyBoolean(),
- any(FieldTable.class));
+ any(FieldTable.class), any(UUID.class));
}
private AMQQueue createTestQueue(String queueName, String queueOwner,
boolean exclusive) throws AMQStoreException
{
+ return createTestQueue(queueName, queueOwner, exclusive, null);
+ }
+
+ private AMQQueue createTestQueue(String queueName, String queueOwner,
boolean exclusive, Exchange alternateExchange) throws AMQStoreException
+ {
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName));
when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner));
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
+ when(queue.getAlternateExchange()).thenReturn(alternateExchange);
return queue;
}
Modified:
qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
Fri Jun 22 16:13:52 2012
@@ -359,7 +359,7 @@ public interface ManagedQueue
* @since Qpid JMX API 2.0
* @param exclusive the capacity in bytes
* @throws IOException
- * @throws JMException
+ * @throws JMException
*/
@MBeanAttribute(name="Exclusive", description="Whether the queue is
Exclusive or not")
void setExclusive(boolean exclusive) throws IOException, JMException;
@@ -371,8 +371,10 @@ public interface ManagedQueue
* @param exchangeName the name of the exchange to use. Specifying null or
the empty string will clear the
* alternate exchange.
* @throws IOException
+ * @throws JMException
*/
- void setAlternateExchange(String exchangeName) throws IOException;
+ @MBeanAttribute(name="AlternateExchange", description="Alternate exchange
for the queue")
+ void setAlternateExchange(String exchangeName) throws IOException,
JMException;
/**
* Returns the name of the Alternate Exchange for the queue, or null if
there isn't one.
@@ -381,7 +383,6 @@ public interface ManagedQueue
* @return the name of the Alternate Exchange for the queue, or null if
there isn't one
* @throws IOException
*/
- @MBeanAttribute(name="AlternateExchange", description="Alternate exchange
for the queue")
String getAlternateExchange() throws IOException;
//********** Operations *****************//
Modified:
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
Fri Jun 22 16:13:52 2012
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -168,13 +169,11 @@ public class ManagedQueueMBeanTest exten
assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription());
}
+ /**
+ * Requires persistent store.
+ */
public void testQueueDescriptionSurvivesRestart() throws Exception
{
- if (!isBrokerStorePersistent())
- {
- return;
- }
-
String queueName = getTestQueueName();
Map<String, Object> arguments =
Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION,
(Object)TEST_QUEUE_DESCRIPTION);
@@ -211,6 +210,82 @@ public class ManagedQueueMBeanTest exten
}
/**
+ * Requires 0-10 as relies on ADDR addresses.
+ * @see AddressBasedDestinationTest for the testing of message routing to
the alternate exchange
+ */
+ public void testGetSetAlternateExchange() throws Exception
+ {
+ String queueName = getTestQueueName();
+ String altExchange = "amq.fanout";
+ String addrWithAltExch =
String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}",
queueName, altExchange);
+ Queue queue = _session.createQueue(addrWithAltExch);
+
+ createQueueOnBroker(queue);
+
+ final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ assertEquals("Newly created queue does not have expected alternate
exchange", altExchange, managedQueue.getAlternateExchange());
+
+ String newAltExch = "amq.topic";
+ managedQueue.setAlternateExchange(newAltExch);
+ assertEquals("Unexpected alternate exchange after set", newAltExch,
managedQueue.getAlternateExchange());
+ }
+
+ /**
+ * Requires 0-10 as relies on ADDR addresses.
+ */
+ public void testRemoveAlternateExchange() throws Exception
+ {
+ String queueName = getTestQueueName();
+ String altExchange = "amq.fanout";
+ String addrWithAltExch =
String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}",
queueName, altExchange);
+ Queue queue = _session.createQueue(addrWithAltExch);
+
+ createQueueOnBroker(queue);
+
+ final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ assertEquals("Newly created queue does not have expected alternate
exchange", altExchange, managedQueue.getAlternateExchange());
+
+ managedQueue.setAlternateExchange("");
+ assertNull("Unexpected alternate exchange after set",
managedQueue.getAlternateExchange());
+ }
+
+ /**
+ * Requires persistent store
+ * Requires 0-10 as relies on ADDR addresses.
+ */
+ public void testAlternateExchangeSurvivesRestart() throws Exception
+ {
+ String queueName1 = getTestQueueName() + "1";
+ String altExchange1 = "amq.fanout";
+ String addr1WithAltExch =
String.format("ADDR:%s;{create:always,node:{durable:
true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1,
altExchange1);
+ Queue queue1 = _session.createQueue(addr1WithAltExch);
+
+ String queueName2 = getTestQueueName() + "2";
+ String addr2WithoutAltExch =
String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,}}",
queueName2);
+ Queue queue2 = _session.createQueue(addr2WithoutAltExch);
+
+ createQueueOnBroker(queue1);
+ createQueueOnBroker(queue2);
+
+ ManagedQueue managedQueue1 = _jmxUtils.getManagedQueue(queueName1);
+ assertEquals("Newly created queue1 does not have expected alternate
exchange", altExchange1, managedQueue1.getAlternateExchange());
+
+ ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2);
+ assertNull("Newly created queue2 does not have expected alternate
exchange", managedQueue2.getAlternateExchange());
+
+ String altExchange2 = "amq.fanout";
+ managedQueue2.setAlternateExchange(altExchange2);
+
+ restartBroker();
+
+ managedQueue1 = _jmxUtils.getManagedQueue(queueName1);
+ assertEquals("Queue1 does not have expected alternate exchange after
restart", altExchange1, managedQueue1.getAlternateExchange());
+
+ managedQueue2 = _jmxUtils.getManagedQueue(queueName2);
+ assertEquals("Queue2 does not have expected updated alternate exchange
after restart", altExchange2, managedQueue2.getAlternateExchange());
+ }
+
+ /**
* Tests {@link ManagedQueue#viewMessages(long, long)} interface.
*/
public void testViewSingleMessage() throws Exception
Modified:
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaPre010Excludes
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaPre010Excludes?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaPre010Excludes
(original)
+++
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaPre010Excludes
Fri Jun 22 16:13:52 2012
@@ -70,3 +70,8 @@ org.apache.qpid.test.unit.ct.DurableSubs
org.apache.qpid.ra.QpidRAConnectionTest#*
org.apache.qpid.ra.QpidRAXAResourceTest#*
+// These tests rely on new address syntax
+org.apache.qpid.management.jmx.ManagedQueueMBeanTest#testGetSetAlternateExchange
+org.apache.qpid.management.jmx.ManagedQueueMBeanTest#testRemoveAlternateExchange
+org.apache.qpid.management.jmx.ManagedQueueMBeanTest#testAlternateExchangeSurvivesRestart
+
Modified:
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaTransientExcludes
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaTransientExcludes?rev=1352946&r1=1352945&r2=1352946&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaTransientExcludes
(original)
+++
qpid/branches/java-config-and-management/qpid/java/test-profiles/JavaTransientExcludes
Fri Jun 22 16:13:52 2012
@@ -42,3 +42,6 @@ org.apache.qpid.server.store.MessageStor
org.apache.qpid.server.store.berkeleydb.*
org.apache.qpid.server.store.DurableConfigurationStoreTest#*
+
+org.apache.qpid.management.jmx.ManagedQueueMBeanTest#testAlternateExchangeSurvivesRestart
+org.apache.qpid.management.jmx.ManagedQueueMBeanTest#testQueueDescriptionSurvivesRestart
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]