Author: kwall
Date: Tue Jul 7 21:07:31 2015
New Revision: 1689742
URL: http://svn.apache.org/r1689742
Log:
QPID-6619: [Java Broker] Prevent the protocol layers observing an exchange or
queue object before it has attained state.
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/hierarchy/AbstractConfiguredObjectTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
Tue Jul 7 21:07:31 2015
@@ -54,7 +54,6 @@ import org.apache.qpid.server.stats.Stat
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.*;
@@ -165,13 +164,13 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
- public MessageDestination getMessageDestination(final String name)
+ public MessageDestination getAttainedMessageDestination(final String name)
{
return null;
}
@Override
- public ExchangeImpl<?> getExchange(final String name)
+ public ExchangeImpl<?> getAttainedExchange(final String name)
{
return null;
}
@@ -310,19 +309,19 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
- public AMQQueue<?> getQueue(final String name)
+ public AMQQueue<?> getAttainedQueue(final String name)
{
return null;
}
@Override
- public MessageSource getMessageSource(final String name)
+ public MessageSource getAttainedMessageSource(final String name)
{
return null;
}
@Override
- public AMQQueue<?> getQueue(final UUID id)
+ public AMQQueue<?> getAttainedQueue(final UUID id)
{
return null;
}
@@ -351,12 +350,6 @@ public class BDBHAReplicaVirtualHostImpl
{
return null;
}
-
- @Override
- public ExchangeImpl<?> getExchange(final UUID id)
- {
- return null;
- }
@Override
public MessageDestination getDefaultDestination()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
Tue Jul 7 21:07:31 2015
@@ -58,14 +58,14 @@ public class DefaultDestination implemen
{
routingAddress = "";
}
- final AMQQueue q = _virtualHost.getQueue(routingAddress);
+ final AMQQueue q = _virtualHost.getAttainedQueue(routingAddress);
if(q == null)
{
routingAddress = _virtualHost.getLocalAddress(routingAddress);
if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
{
String[] parts = routingAddress.split("/",2);
- ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
+ ExchangeImpl exchange =
_virtualHost.getAttainedExchange(parts[0]);
if(exchange != null)
{
return exchange.send(message, parts[1],
instanceProperties, txn, postEnqueueAction);
@@ -73,7 +73,7 @@ public class DefaultDestination implemen
}
else if(!routingAddress.contains("/"))
{
- ExchangeImpl exchange =
_virtualHost.getExchange(routingAddress);
+ ExchangeImpl exchange =
_virtualHost.getAttainedExchange(routingAddress);
if(exchange != null)
{
return exchange.send(message, "", instanceProperties, txn,
postEnqueueAction);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
Tue Jul 7 21:07:31 2015
@@ -45,9 +45,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -106,18 +109,18 @@ public abstract class AbstractConfigured
- private final Map<String,Object> _attributes = new HashMap<String,
Object>();
+ private final Map<String,Object> _attributes = new HashMap<>();
private final Map<Class<? extends ConfiguredObject>, ConfiguredObject>
_parents =
- new HashMap<Class<? extends ConfiguredObject>, ConfiguredObject>();
+ new HashMap<>();
private final Collection<ConfigurationChangeListener> _changeListeners =
- new ArrayList<ConfigurationChangeListener>();
+ new ArrayList<>();
private final Map<Class<? extends ConfiguredObject>,
Collection<ConfiguredObject<?>>> _children =
- new ConcurrentHashMap<Class<? extends ConfiguredObject>,
Collection<ConfiguredObject<?>>>();
- private final Map<Class<? extends ConfiguredObject>,
Map<UUID,ConfiguredObject<?>>> _childrenById =
- new ConcurrentHashMap<Class<? extends ConfiguredObject>,
Map<UUID,ConfiguredObject<?>>>();
- private final Map<Class<? extends ConfiguredObject>,
Map<String,ConfiguredObject<?>>> _childrenByName =
- new ConcurrentHashMap<Class<? extends ConfiguredObject>,
Map<String,ConfiguredObject<?>>>();
+ new ConcurrentHashMap<>();
+ private final Map<Class<? extends ConfiguredObject>,
ConcurrentMap<UUID,ConfiguredObject<?>>> _childrenById =
+ new ConcurrentHashMap<>();
+ private final Map<Class<? extends ConfiguredObject>,
ConcurrentMap<String,ConfiguredObject<?>>> _childrenByName =
+ new ConcurrentHashMap<>();
@ManagedAttributeField
@@ -160,6 +163,7 @@ public abstract class AbstractConfigured
private LifetimePolicy _lifetimePolicy;
private final Map<String, ConfiguredObjectAttribute<?,?>> _attributeTypes;
+
private final Map<String, ConfiguredObjectTypeRegistry.AutomatedField>
_automatedFields;
private final Map<State, Map<State, Method>> _stateChangeMethods;
@@ -172,6 +176,9 @@ public abstract class AbstractConfigured
@ManagedAttributeField
private State _desiredState;
+
+
+ private volatile SettableFuture<ConfiguredObject<X>> _attainStateFuture =
SettableFuture.create();
private boolean _openComplete;
private boolean _openFailed;
private volatile State _state = State.UNINITIALIZED;
@@ -1166,6 +1173,12 @@ public abstract class AbstractConfigured
State currentState = getState();
State desiredState = getDesiredState();
ListenableFuture<Void> returnVal;
+
+ if (_attainStateFuture.isDone())
+ {
+ _attainStateFuture = SettableFuture.create();
+ }
+
if(currentState != desiredState)
{
Method stateChangingMethod = getStateChangeMethod(currentState,
desiredState);
@@ -1174,8 +1187,17 @@ public abstract class AbstractConfigured
try
{
returnVal = (ListenableFuture<Void>)
stateChangingMethod.invoke(this);
+ doAfter(returnVal, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
_attainStateFuture.set(AbstractConfiguredObject.this);
+ }
+ });
if(getState() != currentState)
{
+ // TODO - KW - shouldn't I be done after too???
notifyStateChanged(currentState, getState());
}
}
@@ -1200,11 +1222,13 @@ public abstract class AbstractConfigured
else
{
returnVal = Futures.immediateFuture(null);
+ _attainStateFuture.set(this);
}
}
else
{
returnVal = Futures.immediateFuture(null);
+ _attainStateFuture.set(this);
}
return returnVal;
}
@@ -1724,15 +1748,17 @@ public abstract class AbstractConfigured
Class categoryClass = child.getCategoryClass();
UUID childId = child.getId();
String name = child.getName();
- if(_childrenById.get(categoryClass).containsKey(childId))
+ ConfiguredObject<?> existingWithSameId =
_childrenById.get(categoryClass).get(childId);
+ if(existingWithSameId != null)
{
- throw new DuplicateIdException(child);
+ throw new DuplicateIdException(existingWithSameId);
}
if(getModel().getParentTypes(categoryClass).size() == 1)
{
- if (_childrenByName.get(categoryClass).containsKey(name))
+ ConfiguredObject<?> existingWithSameName =
_childrenByName.get(categoryClass).putIfAbsent(name, child);
+ if (existingWithSameName != null)
{
- throw new DuplicateNameException(child);
+ throw new DuplicateNameException(existingWithSameName);
}
_childrenByName.get(categoryClass).put(name, child);
}
@@ -1784,6 +1810,39 @@ public abstract class AbstractConfigured
}
}
+ protected final <R> R doSync(ListenableFuture<R> async, long timeout,
TimeUnit units) throws TimeoutException
+ {
+ try
+ {
+ return async.get(timeout, units);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else if(cause instanceof Error)
+ {
+ throw (Error) cause;
+ }
+ else if(cause != null)
+ {
+ throw new ServerScopedRuntimeException(cause);
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+ }
+ }
+
public final ListenableFuture<Void> deleteAsync()
{
return setDesiredState(State.DELETED);
@@ -1858,6 +1917,41 @@ public abstract class AbstractConfigured
}
@Override
+ public <C extends ConfiguredObject> ListenableFuture<C>
getAttainedChildByName(final Class<C> childClass,
+
final String name)
+ {
+ C child = getChildByName(childClass, name);
+ if (child instanceof AbstractConfiguredObject)
+ {
+ return ((AbstractConfiguredObject)child).getAttainStateFuture();
+ }
+ else
+ {
+ return Futures.immediateFuture(child);
+ }
+ }
+
+ @Override
+ public <C extends ConfiguredObject> ListenableFuture<C>
getAttainedChildById(final Class<C> childClass,
+
final UUID id)
+ {
+ C child = getChildById(childClass, id);
+ if (child instanceof AbstractConfiguredObject)
+ {
+ return ((AbstractConfiguredObject)child).getAttainStateFuture();
+ }
+ else
+ {
+ return Futures.immediateFuture(child);
+ }
+ }
+
+ private <C extends ConfiguredObject> ListenableFuture<C>
getAttainStateFuture()
+ {
+ return (ListenableFuture<C>) _attainStateFuture;
+ }
+
+ @Override
public final TaskExecutor getTaskExecutor()
{
return _taskExecutor;
@@ -2641,24 +2735,29 @@ public abstract class AbstractConfigured
public final static class DuplicateIdException extends
IllegalArgumentException
{
- private DuplicateIdException(final ConfiguredObject<?> child)
+ private DuplicateIdException(final ConfiguredObject<?> existing)
{
- super("Child of type " + child.getClass().getSimpleName() + "
already exists with id of " + child.getId());
+ super("Child of type " + existing.getClass().getSimpleName() + "
already exists with id of " + existing.getId());
}
}
public final static class DuplicateNameException extends
IllegalArgumentException
{
- private final String _name;
- private DuplicateNameException(final ConfiguredObject<?> child)
+ private final ConfiguredObject<?> _existing;
+ private DuplicateNameException(final ConfiguredObject<?> existing)
{
- super("Child of type " + child.getClass().getSimpleName() + "
already exists with name of " + child.getName());
- _name = child.getName();
+ super("Child of type " + existing.getClass().getSimpleName() + "
already exists with name of " + existing.getName());
+ _existing = existing;
}
public String getName()
{
- return _name;
+ return _existing.getName();
+ }
+
+ public ConfiguredObject<?> getExisting()
+ {
+ return _existing;
}
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
Tue Jul 7 21:07:31 2015
@@ -239,6 +239,12 @@ public interface ConfiguredObject<X exte
<C extends ConfiguredObject> C createChild(Class<C> childClass,
Map<String, Object> attributes,
ConfiguredObject...
otherParents);
+
+ <C extends ConfiguredObject> ListenableFuture<C>
getAttainedChildById(Class<C> childClass,
+ UUID
id);
+
+ <C extends ConfiguredObject> ListenableFuture<C>
getAttainedChildByName(Class<C> childClass, String name);
+
<C extends ConfiguredObject> ListenableFuture<C> createChildAsync(Class<C>
childClass,
Map<String, Object> attributes,
ConfiguredObject... otherParents);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
Tue Jul 7 21:07:31 2015
@@ -106,6 +106,9 @@ public interface VirtualHost<X extends V
@ManagedContextDefault( name = "virtualhost.connectionThreadCount")
public static final int DEFAULT_CONNECTION_THREAD_COUNT =
Runtime.getRuntime().availableProcessors();
+ @ManagedContextDefault( name = "virtualhost.awaitAttainmentTimeout")
+ public static final int DEFAULT_AWAIT_ATTAINMENT_TIMEOUT = 5000;
+
@ManagedAttribute( defaultValue = "${virtualhost.connectionThreadCount}")
int getConnectionThreadCount();
@@ -200,8 +203,6 @@ public interface VirtualHost<X extends V
void executeTransaction(TransactionalOperation op);
- E getExchange(UUID id);
-
MessageStore getMessageStore();
String getType();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Tue Jul 7 21:07:31 2015
@@ -655,37 +655,37 @@ public abstract class AbstractVirtualHos
}
@Override
- public AMQQueue<?> getQueue(String name)
+ public AMQQueue<?> getAttainedQueue(String name)
{
- AMQQueue<?> childByName = (AMQQueue<?>) getChildByName(Queue.class,
name);
- if(childByName == null && getGlobalAddressDomains() != null)
+ Queue child = awaitChildClassToAttainState(Queue.class, name);
+ if(child == null && getGlobalAddressDomains() != null)
{
for(String domain : getGlobalAddressDomains())
{
if(name.startsWith(domain + "/"))
{
- childByName = (AMQQueue<?>)
getChildByName(Queue.class,name.substring(domain.length()));
- if(childByName != null)
+ child = awaitChildClassToAttainState(Queue.class,
name.substring(domain.length()));
+ if(child != null)
{
break;
}
}
}
}
- return childByName;
+ return (AMQQueue<?>) child;
}
@Override
- public MessageSource getMessageSource(final String name)
+ public MessageSource getAttainedMessageSource(final String name)
{
MessageSource systemSource = _systemNodeSources.get(name);
- return systemSource == null ? getQueue(name) : systemSource;
+ return systemSource == null ? (MessageSource)
awaitChildClassToAttainState(Queue.class, name) : systemSource;
}
@Override
- public AMQQueue<?> getQueue(UUID id)
+ public AMQQueue<?> getAttainedQueue(UUID id)
{
- return (AMQQueue<?>) getChildById(Queue.class, id);
+ return (AMQQueue<?>) awaitChildClassToAttainState(Queue.class, id);
}
@Override
@@ -736,44 +736,67 @@ public abstract class AbstractVirtualHos
}
catch (DuplicateNameException e)
{
- throw new QueueExistsException(String.format("Queue with name '%s'
already exists", e.getName()), getQueue(e.getName()));
+ throw new QueueExistsException(String.format("Queue with name '%s'
already exists", e.getName()),
+ (AMQQueue) e.getExisting());
}
}
@Override
- public MessageDestination getMessageDestination(final String name)
+ public MessageDestination getAttainedMessageDestination(final String name)
{
MessageDestination destination = _systemNodeDestinations.get(name);
- return destination == null ? getExchange(name) : destination;
+ return destination == null ? getAttainedExchange(name) : destination;
}
@Override
- public ExchangeImpl getExchange(String name)
+ public ExchangeImpl getAttainedExchange(String name)
{
- ExchangeImpl childByName = getChildByName(ExchangeImpl.class, name);
- if(childByName == null && getGlobalAddressDomains() != null)
+ Exchange child = awaitChildClassToAttainState(Exchange.class, name);
+ if(child == null && getGlobalAddressDomains() != null)
{
for(String domain : getGlobalAddressDomains())
{
if(name.startsWith(domain + "/"))
{
- childByName =
getChildByName(ExchangeImpl.class,name.substring(domain.length()));
- if(childByName != null)
+ child = awaitChildClassToAttainState(Exchange.class,
name.substring(domain.length()));
+ if(child != null)
{
break;
}
}
}
}
- return childByName;
+ return (ExchangeImpl) child;
}
- @Override
- public ExchangeImpl getExchange(UUID id)
+ private <C extends ConfiguredObject> C awaitChildClassToAttainState(final
Class<C> childClass, final String name)
+ {
+ ListenableFuture<C> attainedChildByName =
getAttainedChildByName(childClass, name);
+ try
+ {
+ return (C) doSync(attainedChildByName,
DEFAULT_AWAIT_ATTAINMENT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ _logger.warn("Gave up waiting for {} '{}' to attain state. Check
object's state via Management.", childClass.getSimpleName(), name);
+ return null;
+ }
+ }
+
+ private <C extends ConfiguredObject> C awaitChildClassToAttainState(final
Class<C> childClass, final UUID id)
{
- return getChildById(ExchangeImpl.class, id);
+ ListenableFuture<C> attainedChildByName =
getAttainedChildById(childClass, id);
+ try
+ {
+ return (C) doSync(attainedChildByName,
DEFAULT_AWAIT_ATTAINMENT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ _logger.warn("Gave up waiting for {} with ID {} to attain state.
Check object's state via Management.", childClass.getSimpleName(), id);
+ return null;
+ }
}
@Override
@@ -818,7 +841,8 @@ public abstract class AbstractVirtualHos
{
if(t instanceof DuplicateNameException)
{
- returnVal.setException(new
ExchangeExistsException(getExchange(((DuplicateNameException)t).getName())));
+ DuplicateNameException dne =
(DuplicateNameException) t;
+ returnVal.setException(new
ExchangeExistsException((ExchangeImpl) dne.getExisting()));
}
else
{
@@ -1539,7 +1563,7 @@ public abstract class AbstractVirtualHos
AMQQueue dlQueue = null;
{
- dlQueue = getQueue(dlQueueName);
+ dlQueue = (AMQQueue) getChildByName(Queue.class, dlQueueName);
if(dlQueue == null)
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Tue Jul 7 21:07:31 2015
@@ -260,7 +260,7 @@ public class AsynchronousMessageStoreRec
}
for (Transaction.EnqueueRecord record : enqueues)
{
- final AMQQueue<?> queue =
getVirtualHost().getQueue(record.getResource().getId());
+ final AMQQueue<?> queue =
getVirtualHost().getAttainedQueue(record.getResource().getId());
if (queue != null)
{
final long messageId =
record.getMessage().getMessageNumber();
@@ -320,7 +320,7 @@ public class AsynchronousMessageStoreRec
for (Transaction.DequeueRecord record : dequeues)
{
- final AMQQueue<?> queue =
getVirtualHost().getQueue(record.getEnqueueRecord().getQueueId());
+ final AMQQueue<?> queue =
getVirtualHost().getAttainedQueue(record.getEnqueueRecord().getQueueId());
if (queue != null)
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
Tue Jul 7 21:07:31 2015
@@ -82,7 +82,7 @@ public class SynchronousMessageStoreReco
{
eventLogger.message(logSubject,
TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
eventLogger.message(logSubject,
TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
- virtualHost.getQueue(entry.getKey()).completeRecovery();
+ virtualHost.getAttainedQueue(entry.getKey()).completeRecovery();
}
Collection<AMQQueue> allQueues = virtualHost.getQueues();
@@ -178,7 +178,7 @@ public class SynchronousMessageStoreReco
{
final UUID queueId = record.getQueueId();
long messageId = record.getMessageNumber();
- AMQQueue<?> queue = _virtualHost.getQueue(queueId);
+ AMQQueue<?> queue = _virtualHost.getAttainedQueue(queueId);
if(queue != null)
{
String queueName = queue.getName();
@@ -262,7 +262,7 @@ public class SynchronousMessageStoreReco
}
for(EnqueueRecord record : enqueues)
{
- final AMQQueue<?> queue =
_virtualHost.getQueue(record.getResource().getId());
+ final AMQQueue<?> queue =
_virtualHost.getAttainedQueue(record.getResource().getId());
if(queue != null)
{
final long messageId =
record.getMessage().getMessageNumber();
@@ -318,7 +318,7 @@ public class SynchronousMessageStoreReco
}
for(Transaction.DequeueRecord record : dequeues)
{
- final AMQQueue<?> queue =
_virtualHost.getQueue(record.getEnqueueRecord().getQueueId());
+ final AMQQueue<?> queue =
_virtualHost.getAttainedQueue(record.getEnqueueRecord().getQueueId());
if(queue != null)
{
final long messageId =
record.getEnqueueRecord().getMessageNumber();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
Tue Jul 7 21:07:31 2015
@@ -51,10 +51,10 @@ public interface VirtualHostImpl< X exte
String getName();
- Q getQueue(String name);
- MessageSource getMessageSource(String name);
+ Q getAttainedQueue(String name);
+ Q getAttainedQueue(UUID id);
- Q getQueue(UUID id);
+ MessageSource getAttainedMessageSource(String name);
Collection<Q> getQueues();
@@ -69,10 +69,9 @@ public interface VirtualHostImpl< X exte
void removeExchange(E exchange, boolean force) throws
ExchangeIsAlternateException,
RequiredExchangeException;
- MessageDestination getMessageDestination(String name);
+ E getAttainedExchange(String name);
- E getExchange(String name);
- E getExchange(UUID id);
+ MessageDestination getAttainedMessageDestination(String name);
MessageDestination getDefaultDestination();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
Tue Jul 7 21:07:31 2015
@@ -55,7 +55,6 @@ import org.apache.qpid.server.stats.Stat
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.*;
@@ -166,13 +165,13 @@ class RedirectingVirtualHostImpl
}
@Override
- public MessageDestination getMessageDestination(final String name)
+ public MessageDestination getAttainedMessageDestination(final String name)
{
return null;
}
@Override
- public ExchangeImpl<?> getExchange(final String name)
+ public ExchangeImpl<?> getAttainedExchange(final String name)
{
return null;
}
@@ -311,19 +310,19 @@ class RedirectingVirtualHostImpl
}
@Override
- public AMQQueue<?> getQueue(final String name)
+ public AMQQueue<?> getAttainedQueue(final String name)
{
return null;
}
@Override
- public MessageSource getMessageSource(final String name)
+ public MessageSource getAttainedMessageSource(final String name)
{
return null;
}
@Override
- public AMQQueue<?> getQueue(final UUID id)
+ public AMQQueue<?> getAttainedQueue(final UUID id)
{
return null;
}
@@ -355,12 +354,6 @@ class RedirectingVirtualHostImpl
}
@Override
- public ExchangeImpl<?> getExchange(final UUID id)
- {
- return null;
- }
-
- @Override
public MessageDestination getDefaultDestination()
{
return null;
@@ -578,4 +571,5 @@ class RedirectingVirtualHostImpl
{
}
+
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Tue Jul 7 21:07:31 2015
@@ -111,49 +111,14 @@ public class MockConsumer implements Con
return tag;
}
- public FilterManager getFilters()
- {
- if(_messageIds != null)
- {
- FilterManager filters = new FilterManager();
- MessageFilter filter = new MessageFilter()
- {
- @Override
- public String getName()
- {
- return "";
- }
-
- @Override
- public boolean startAtTail()
- {
- return false;
- }
-
- @Override
- public boolean matches(final Filterable message)
- {
- final String messageId =
message.getMessageHeader().getMessageId();
- return _messageIds.contains(messageId);
- }
- };
- filters.add(filter.getName(), filter);
- return filters;
- }
- else
- {
- return null;
- }
- }
-
public long getUnacknowledgedBytes()
{
- return 0; // TODO - Implement
+ return 0;
}
public long getUnacknowledgedMessages()
{
- return 0; // TODO - Implement
+ return 0;
}
public AMQQueue getQueue()
@@ -540,526 +505,4 @@ public class MockConsumer implements Con
}
}
- private static class MockConnectionModel implements
AMQPConnection<MockConnectionModel>
- {
-
- @Override
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- }
-
- @Override
- public void registerMessageDelivered(long messageSize)
- {
- }
-
- @Override
- public void closeAsync(AMQConstant cause, String message)
- {
- }
-
- @Override
- public void closeSessionAsync(AMQSessionModel<?> session, AMQConstant
cause,
- String message)
- {
- }
-
- @Override
- public long getConnectionId()
- {
- return 0;
- }
-
- @Override
- public void block()
- {
- }
-
- @Override
- public void unblock()
- {
-
- }
-
- @Override
- public String getRemoteAddressString()
- {
- return "remoteAddress:1234";
- }
-
- public SocketAddress getRemoteSocketAddress()
- {
- return null;
- }
-
- @Override
- public String getClientId()
- {
- return null;
- }
-
- @Override
- public String getRemoteContainerName()
- {
- return null;
- }
-
- @Override
- public void notifyWork()
- {
-
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return false;
- }
-
- @Override
- public boolean hasSessionWithName(final byte[] name)
- {
- return false;
- }
-
- @Override
- public void setScheduler(final NetworkConnectionScheduler
networkConnectionScheduler)
- {
-
- }
-
- @Override
- public String getClientVersion()
- {
- return null;
- }
-
- @Override
- public boolean isIncoming()
- {
- return false;
- }
-
- @Override
- public String getLocalAddress()
- {
- return null;
- }
-
- @Override
- public String getPrincipal()
- {
- return null;
- }
-
- @Override
- public String getRemoteAddress()
- {
- return null;
- }
-
- @Override
- public String getRemoteProcessName()
- {
- return null;
- }
-
- @Override
- public String getRemoteProcessPid()
- {
- return null;
- }
-
- @Override
- public long getSessionCountLimit()
- {
- return 0;
- }
-
- @Override
- public Principal getAuthorizedPrincipal()
- {
- return null;
- }
-
- @Override
- public AmqpPort<?> getPort()
- {
- return null;
- }
-
- @Override
- public long getBytesIn()
- {
- return 0;
- }
-
- @Override
- public long getBytesOut()
- {
- return 0;
- }
-
- @Override
- public long getMessagesIn()
- {
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- return 0;
- }
-
- @Override
- public long getLastIoTime()
- {
- return 0;
- }
-
- @Override
- public int getSessionCount()
- {
- return 0;
- }
-
- @Override
- public Collection<Session> getSessions()
- {
- return null;
- }
-
- @Override
- public AbstractAMQPConnection<?> getUnderlyingConnection()
- {
- return null;
- }
-
- @Override
- public Transport getTransport()
- {
- return null;
- }
-
- @Override
- public boolean isConnectionStopped()
- {
- return false;
- }
-
- @Override
- public String getVirtualHostName()
- {
- return null;
- }
-
- @Override
- public VirtualHost<?, ?, ?> getVirtualHost()
- {
- return null;
- }
-
- @Override
- public void addDeleteTask(final Action task)
- {
-
- }
-
- @Override
- public void removeDeleteTask(final Action task)
- {
-
- }
-
-
- @Override
- public UUID getId()
- {
- return null;
- }
-
- @Override
- public String getName()
- {
- return null;
- }
-
- @Override
- public String getDescription()
- {
- return null;
- }
-
- @Override
- public String getType()
- {
- return null;
- }
-
- @Override
- public Map<String, String> getContext()
- {
- return null;
- }
-
- @Override
- public <T> T getContextValue(final Class<T> clazz, final String
propertyName)
- {
- return null;
- }
-
- @Override
- public <T> T getContextValue(final Class<T> clazz, final Type t, final
String propertyName)
- {
- return null;
- }
-
- @Override
- public Set<String> getContextKeys(final boolean excludeSystem)
- {
- return null;
- }
-
- @Override
- public String getLastUpdatedBy()
- {
- return null;
- }
-
- @Override
- public long getLastUpdatedTime()
- {
- return 0;
- }
-
- @Override
- public String getCreatedBy()
- {
- return null;
- }
-
- @Override
- public long getCreatedTime()
- {
- return 0;
- }
-
- @Override
- public org.apache.qpid.server.model.State getDesiredState()
- {
- return null;
- }
-
- @Override
- public org.apache.qpid.server.model.State getState()
- {
- return null;
- }
-
- @Override
- public void addChangeListener(final ConfigurationChangeListener
listener)
- {
-
- }
-
- @Override
- public boolean removeChangeListener(final ConfigurationChangeListener
listener)
- {
- return false;
- }
-
- @Override
- public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
- {
- return null;
- }
-
- @Override
- public boolean isDurable()
- {
- return false;
- }
-
- @Override
- public LifetimePolicy getLifetimePolicy()
- {
- return null;
- }
-
- @Override
- public Collection<String> getAttributeNames()
- {
- return null;
- }
-
- @Override
- public Object getAttribute(final String name)
- {
- return null;
- }
-
- @Override
- public Map<String, Object> getActualAttributes()
- {
- return null;
- }
-
- @Override
- public Object setAttribute(final String name, final Object expected,
final Object desired)
- throws IllegalStateException, AccessControlException,
IllegalArgumentException
- {
- return null;
- }
-
- @Override
- public Map<String, Number> getStatistics()
- {
- return null;
- }
-
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(final
Class<C> clazz)
- {
- return null;
- }
-
- @Override
- public <C extends ConfiguredObject> C getChildById(final Class<C>
clazz, final UUID id)
- {
- return null;
- }
-
- @Override
- public <C extends ConfiguredObject> C getChildByName(final Class<C>
clazz, final String name)
- {
- return null;
- }
-
- @Override
- public <C extends ConfiguredObject> C createChild(final Class<C>
childClass,
- final Map<String,
Object> attributes,
- final
ConfiguredObject... otherParents)
- {
- return null;
- }
-
- @Override
- public <C extends ConfiguredObject> ListenableFuture<C>
createChildAsync(final Class<C> childClass,
-
final Map<String, Object> attributes,
-
final ConfiguredObject... otherParents)
- {
- return null;
- }
-
- @Override
- public void setAttributes(final Map<String, Object> attributes)
- throws IllegalStateException, AccessControlException,
IllegalArgumentException
- {
-
- }
-
- @Override
- public ListenableFuture<Void> setAttributesAsync(final Map<String,
Object> attributes)
- throws IllegalStateException, AccessControlException,
IllegalArgumentException
- {
- return null;
- }
-
- @Override
- public Class<? extends ConfiguredObject> getCategoryClass()
- {
- return null;
- }
-
- @Override
- public Class<? extends ConfiguredObject> getTypeClass()
- {
- return null;
- }
-
- @Override
- public boolean managesChildStorage()
- {
- return false;
- }
-
- @Override
- public <C extends ConfiguredObject<C>> C findConfiguredObject(final
Class<C> clazz, final String name)
- {
- return null;
- }
-
- @Override
- public ConfiguredObjectRecord asObjectRecord()
- {
- return null;
- }
-
- @Override
- public void open()
- {
-
- }
-
- @Override
- public ListenableFuture<Void> openAsync()
- {
- return null;
- }
-
- @Override
- public void close()
- {
-
- }
-
- @Override
- public ListenableFuture<Void> closeAsync()
- {
- return null;
- }
-
- @Override
- public ListenableFuture<Void> deleteAsync()
- {
- return null;
- }
-
- @Override
- public TaskExecutor getTaskExecutor()
- {
- return null;
- }
-
- @Override
- public TaskExecutor getChildExecutor()
- {
- return null;
- }
-
- @Override
- public ConfiguredObjectFactory getObjectFactory()
- {
- return null;
- }
-
- @Override
- public Model getModel()
- {
- return null;
- }
-
- @Override
- public void delete()
- {
-
- }
-
- @Override
- public void decryptSecrets()
- {
-
- }
- }
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
Tue Jul 7 21:07:31 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -46,7 +47,7 @@ public class BindingLogSubjectTest exten
_testVhost = BrokerTestHelper.createVirtualHost("test");
_routingKey = "RoutingKey";
- _exchange = _testVhost.getExchange("amq.direct");
+ _exchange = (ExchangeImpl) _testVhost.getChildByName(Exchange.class,
"amq.direct");
_queue = mock(AMQQueue.class);
when(_queue.getName()).thenReturn("BindingLogSubjectTest");
when(_queue.getVirtualHost()).thenReturn(_testVhost);
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
Tue Jul 7 21:07:31 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -40,7 +41,7 @@ public class ExchangeLogSubjectTest exte
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _exchange = _testVhost.getExchange("amq.direct");
+ _exchange = (ExchangeImpl) _testVhost.getChildByName(Exchange.class,
"amq.direct");
_subject = new ExchangeLogSubject(_exchange,_testVhost);
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/hierarchy/AbstractConfiguredObjectTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/hierarchy/AbstractConfiguredObjectTest.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/hierarchy/AbstractConfiguredObjectTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/hierarchy/AbstractConfiguredObjectTest.java
Tue Jul 7 21:07:31 2015
@@ -22,11 +22,15 @@ package org.apache.qpid.server.model.tes
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -95,6 +99,97 @@ public class AbstractConfiguredObjectTes
}
+ public void testGetChildren_NewChild()
+ {
+ final String carName = "myCar";
+ Map<String, Object> carAttributes = new HashMap<>();
+ carAttributes.put(ConfiguredObject.NAME, carName);
+ carAttributes.put(ConfiguredObject.TYPE,
TestKitCarImpl.TEST_KITCAR_TYPE);
+
+ TestCar car = _model.getObjectFactory().create(TestCar.class,
carAttributes);
+
+
+ String engineName = "myEngine";
+ Map<String, Object> engineAttributes = new HashMap<>();
+ engineAttributes.put(ConfiguredObject.NAME, engineName);
+ engineAttributes.put(ConfiguredObject.TYPE,
TestElecEngineImpl.TEST_ELEC_ENGINE_TYPE);
+
+ TestEngine engine = (TestEngine) car.createChild(TestEngine.class,
engineAttributes);
+
+ // Check we can observe the new child from the parent
+
+ assertEquals(1, car.getChildren(TestEngine.class).size());
+ assertEquals(engine, car.getChildById(TestEngine.class,
engine.getId()));
+ assertEquals(engine, car.getChildByName(TestEngine.class,
engine.getName()));
+
+ ListenableFuture attainedChild =
car.getAttainedChildByName(TestEngine.class, engine.getName());
+ assertNotNull(attainedChild);
+ assertTrue("Engine should have already attained state",
attainedChild.isDone());
+ }
+
+ public void testGetChildren_RecoveredChild() throws Exception
+ {
+ final String carName = "myCar";
+ Map<String, Object> carAttributes = new HashMap<>();
+ carAttributes.put(ConfiguredObject.NAME, carName);
+ carAttributes.put(ConfiguredObject.TYPE,
TestKitCarImpl.TEST_KITCAR_TYPE);
+
+ final TestCar car = _model.getObjectFactory().create(TestCar.class,
carAttributes);
+
+ String engineName = "myEngine";
+ final Map<String, Object> engineAttributes = new HashMap<>();
+ engineAttributes.put(ConfiguredObject.NAME, engineName);
+ engineAttributes.put(ConfiguredObject.TYPE,
TestElecEngineImpl.TEST_ELEC_ENGINE_TYPE);
+
+ ConfiguredObjectRecord engineCor = new ConfiguredObjectRecord()
+ {
+ @Override
+ public UUID getId()
+ {
+ return UUID.randomUUID();
+ }
+
+ @Override
+ public String getType()
+ {
+ return TestEngine.class.getSimpleName();
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return engineAttributes;
+ }
+
+ @Override
+ public Map<String, UUID> getParents()
+ {
+ return Collections.singletonMap(TestCar.class.getSimpleName(),
car.getId());
+ }
+ };
+
+ // Recover and resolve the child. Resolving the child registers the
child with its parent (car),
+ // but the child is not open, so won't have attained state
+ TestEngine engine = (TestEngine)
_model.getObjectFactory().recover(engineCor, car).resolve();
+
+ // Check we can observe the recovered child from the parent
+ assertEquals(1, car.getChildren(TestEngine.class).size());
+ assertEquals(engine, car.getChildById(TestEngine.class,
engine.getId()));
+ assertEquals(engine, car.getChildByName(TestEngine.class,
engine.getName()));
+
+ ListenableFuture attainedChild =
car.getAttainedChildByName(TestEngine.class, engine.getName());
+ assertNotNull(attainedChild);
+ assertFalse("Engine should not have yet attained state",
attainedChild.isDone());
+
+ engine.open();
+
+ assertTrue("Engine should have now attained state",
attainedChild.isDone());
+ assertEquals(engine, attainedChild.get());
+
+
+
+ }
+
public void testDefaultContextVariableWhichRefersToAncestor()
{
final String carName = "myCar";
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Tue Jul 7 21:07:31 2015
@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.util.StateChangeListener;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -98,7 +99,7 @@ abstract class AbstractQueueTestBase ext
_queue = _virtualHost.createQueue(attributes);
- _exchange = (DirectExchange)
_virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ _exchange = (DirectExchange)
_virtualHost.getChildByName(Exchange.class,
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
}
@Override
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
Tue Jul 7 21:07:31 2015
@@ -435,8 +435,8 @@ public class SynchronousMessageStoreReco
when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("test-queue");
- when(_virtualHost.getQueue(queueId)).thenReturn(queue);
- when(_virtualHost.getQueue("test-queue")).thenReturn(queue);
+ when(_virtualHost.getAttainedQueue(queueId)).thenReturn(queue);
+ when(_virtualHost.getAttainedQueue("test-queue")).thenReturn(queue);
return queue;
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
Tue Jul 7 21:07:31 2015
@@ -128,7 +128,7 @@ public class VirtualHostQueueCreationTes
private void verifyQueueRegistered(String queueName)
{
- assertNotNull("Queue " + queueName + " was not created",
_virtualHost.getQueue(queueName));
+ assertNotNull("Queue " + queueName + " was not created",
_virtualHost.getChildByName(Queue.class, queueName));
}
public void testPriorityQueueRegistration() throws Exception
@@ -165,7 +165,7 @@ public class VirtualHostQueueCreationTes
//verify that no alternate exchange or DLQ were produced
assertNull("Queue should not have an alternate exchange as DLQ wasn't
enabled", queue.getAlternateExchange());
- assertNull("The DLQ should not exist",
_virtualHost.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist",
_virtualHost.getChildByName(Queue.class, dlQueueName));
verifyRegisteredQueueCount(1);
}
@@ -181,8 +181,8 @@ public class VirtualHostQueueCreationTes
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
- assertNull("The alternate exchange should not yet exist",
_virtualHost.getExchange(dlExchangeName));
+ assertNull("The DLQ should not yet exist",
_virtualHost.getChildByName(Queue.class, dlQueueName));
+ assertNull("The alternate exchange should not yet exist",
_virtualHost.getChildByName(Exchange.class, dlExchangeName));
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -197,10 +197,10 @@ public class VirtualHostQueueCreationTes
assertEquals("Alternate exchange name was not as expected",
dlExchangeName, altExchange.getName());
assertEquals("Alternate exchange type was not as expected",
ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
- assertNotNull("The alternate exchange was not registered as expected",
_virtualHost.getExchange(dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange
instance", altExchange, _virtualHost.getExchange(dlExchangeName));
+ assertNotNull("The alternate exchange was not registered as expected",
_virtualHost.getChildByName(Exchange.class, dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange
instance", altExchange, _virtualHost.getChildByName(Exchange.class,
dlExchangeName));
- AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
+ AMQQueue dlQueue = (AMQQueue) _virtualHost.getChildByName(Queue.class,
dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange",
((ExchangeImpl)altExchange).isBound(dlQueue));
assertNull("DLQ should have no alternate exchange",
dlQueue.getAlternateExchange());
@@ -221,10 +221,10 @@ public class VirtualHostQueueCreationTes
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
- assertNull("The alternate exchange should not yet exist",
_virtualHost.getExchange(dlExchangeName));
+ assertNull("The DLQ should not yet exist",
_virtualHost.getChildByName(Queue.class, dlQueueName));
+ assertNull("The alternate exchange should not yet exist",
_virtualHost.getChildByName(Exchange.class, dlExchangeName));
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
@@ -238,10 +238,10 @@ public class VirtualHostQueueCreationTes
assertEquals("Alternate exchange name was not as expected",
dlExchangeName, altExchange.getName());
assertEquals("Alternate exchange type was not as expected",
ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
- assertNotNull("The alternate exchange was not registered as expected",
_virtualHost.getExchange(dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange
instance", altExchange, _virtualHost.getExchange(dlExchangeName));
+ assertNotNull("The alternate exchange was not registered as expected",
_virtualHost.getChildByName(Exchange.class, dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange
instance", altExchange, _virtualHost.getChildByName(Exchange.class,
dlExchangeName));
- AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
+ AMQQueue dlQueue = (AMQQueue) _virtualHost.getChildByName(Queue.class,
dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange",
((ExchangeImpl)altExchange).isBound(dlQueue));
assertNull("DLQ should have no alternate exchange",
dlQueue.getAlternateExchange());
@@ -264,8 +264,9 @@ public class VirtualHostQueueCreationTes
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
- assertNull("The alternate exchange should not exist",
_virtualHost.getExchange(dlExchangeName));
+ assertNull("The DLQ should not yet exist",
_virtualHost.getChildByName(Queue.class, dlQueueName));
+ assertNull("The alternate exchange should not exist",
_virtualHost.getChildByName(Exchange.class,
+
dlExchangeName));
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
@@ -274,9 +275,8 @@ public class VirtualHostQueueCreationTes
AMQQueue queue = _virtualHost.createQueue(attributes);
assertNull("Queue should not have an alternate exchange as DLQ is
disabled", queue.getAlternateExchange());
- assertNull("The alternate exchange should still not exist",
_virtualHost.getExchange(dlExchangeName));
-
- assertNull("The DLQ should still not exist",
_virtualHost.getQueue(dlQueueName));
+ assertNull("The alternate exchange should still not exist",
_virtualHost.getChildByName(Exchange.class, dlExchangeName));
+ assertNull("The DLQ should still not exist",
_virtualHost.getChildByName(Queue.class, dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
@@ -294,10 +294,10 @@ public class VirtualHostQueueCreationTes
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
- assertNull("The alternate exchange should not exist",
_virtualHost.getExchange(dlExchangeName));
+ assertNull("The DLQ should not yet exist",
_virtualHost.getChildByName(Queue.class, dlQueueName));
+ assertNull("The alternate exchange should not exist",
_virtualHost.getChildByName(Exchange.class, dlExchangeName));
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
@@ -312,8 +312,8 @@ public class VirtualHostQueueCreationTes
//ensure that the autodelete property overrides the request to enable
DLQ
assertNull("Queue should not have an alternate exchange as queue is
autodelete", queue.getAlternateExchange());
- assertNull("The alternate exchange should not exist as queue is
autodelete", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should not exist as queue is autodelete",
_virtualHost.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist as queue is
autodelete", _virtualHost.getChildByName( Exchange.class, dlExchangeName));
+ assertNull("The DLQ should not exist as queue is autodelete",
_virtualHost.getChildByName(Queue.class, dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
Tue Jul 7 21:07:31 2015
@@ -215,7 +215,7 @@ public class ServerSessionDelegate exten
VirtualHostImpl<?,?,?> vhost = getVirtualHost(session);
final Collection<MessageSource> sources = new HashSet<>();
- final MessageSource queue = vhost.getMessageSource(queueName);
+ final MessageSource queue =
vhost.getAttainedMessageSource(queueName);
if(queue != null)
{
sources.add(queue);
@@ -230,7 +230,7 @@ public class ServerSessionDelegate exten
sourceName = sourceName.trim();
if(sourceName.length() != 0)
{
- MessageSource source =
vhost.getMessageSource(sourceName);
+ MessageSource source =
vhost.getAttainedMessageSource(sourceName);
if(source == null)
{
sources.clear();
@@ -940,7 +940,7 @@ public class ServerSessionDelegate exten
private ExchangeImpl getExchange(Session session, String exchangeName)
{
- return getVirtualHost(session).getExchange(exchangeName);
+ return getVirtualHost(session).getAttainedExchange(exchangeName);
}
private MessageDestination getDestinationForMessage(Session ssn,
MessageTransfer xfr)
@@ -950,7 +950,7 @@ public class ServerSessionDelegate exten
MessageDestination destination;
if(xfr.hasDestination())
{
- destination =
virtualHost.getMessageDestination(xfr.getDestination());
+ destination =
virtualHost.getAttainedMessageDestination(xfr.getDestination());
if(destination == null)
{
destination = virtualHost.getDefaultDestination();
@@ -1081,8 +1081,8 @@ public class ServerSessionDelegate exten
{
method.setBindingKey(method.getQueue());
}
- AMQQueue queue = virtualHost.getQueue(method.getQueue());
- ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ AMQQueue queue =
virtualHost.getAttainedQueue(method.getQueue());
+ ExchangeImpl exchange =
virtualHost.getAttainedExchange(exchangeName);
if(queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND,
"Queue: '" + method.getQueue() + "' not found");
@@ -1141,8 +1141,8 @@ public class ServerSessionDelegate exten
}
else
{
- AMQQueue queue = virtualHost.getQueue(method.getQueue());
- ExchangeImpl exchange =
virtualHost.getExchange(method.getExchange());
+ AMQQueue queue = virtualHost.getAttainedQueue(method.getQueue());
+ ExchangeImpl exchange =
virtualHost.getAttainedExchange(method.getExchange());
if(queue == null)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND,
"Queue: '" + method.getQueue() + "' not found");
@@ -1181,7 +1181,7 @@ public class ServerSessionDelegate exten
if(!nameNullOrEmpty(method.getExchange()))
{
isDefaultExchange = false;
- exchange = virtualHost.getExchange(method.getExchange());
+ exchange = virtualHost.getAttainedExchange(method.getExchange());
if(exchange == null)
{
@@ -1357,12 +1357,12 @@ public class ServerSessionDelegate exten
private MessageSource getMessageSource(Session session, String queue)
{
- return getVirtualHost(session).getMessageSource(queue);
+ return getVirtualHost(session).getAttainedMessageSource(queue);
}
private AMQQueue getQueue(Session session, String queue)
{
- return getVirtualHost(session).getQueue(queue);
+ return getVirtualHost(session).getAttainedQueue(queue);
}
@Override
@@ -1380,7 +1380,7 @@ public class ServerSessionDelegate exten
if(method.getPassive())
{
- queue = virtualHost.getQueue(queueName);
+ queue = virtualHost.getAttainedQueue(queueName);
if (queue == null)
{
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Tue Jul 7 21:07:31 2015
@@ -2106,7 +2106,7 @@ public class AMQChannel
sync();
String queueName = AMQShortString.toString(queue);
- MessageSource queue1 = queueName == null ? getDefaultQueue() :
vHost.getMessageSource(queueName);
+ MessageSource queue1 = queueName == null ? getDefaultQueue() :
vHost.getAttainedMessageSource(queueName);
final Collection<MessageSource> sources = new HashSet<>();
if (queue1 != null)
{
@@ -2122,7 +2122,7 @@ public class AMQChannel
sourceName = sourceName.trim();
if (sourceName.length() != 0)
{
- MessageSource source = vHost.getMessageSource(sourceName);
+ MessageSource source =
vHost.getAttainedMessageSource(sourceName);
if (source == null)
{
sources.clear();
@@ -2229,7 +2229,7 @@ public class AMQChannel
VirtualHostImpl vHost = _connection.getVirtualHost();
sync();
- MessageSource queue = queueName == null ? getDefaultQueue() :
vHost.getMessageSource(queueName.toString());
+ MessageSource queue = queueName == null ? getDefaultQueue() :
vHost.getAttainedMessageSource(queueName.toString());
if (queue == null)
{
if (_logger.isDebugEnabled())
@@ -2318,7 +2318,7 @@ public class AMQChannel
}
else
{
- destination =
vHost.getMessageDestination(exchangeName.toString());
+ destination =
vHost.getAttainedMessageDestination(exchangeName.toString());
}
// if the exchange does not exist we raise a channel exception
@@ -2704,7 +2704,7 @@ public class AMQChannel
}
else
{
- MessageSource queue =
virtualHost.getMessageSource(queueName.toString());
+ MessageSource queue =
virtualHost.getAttainedMessageSource(queueName.toString());
if (queue == null)
{
replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
@@ -2721,14 +2721,14 @@ public class AMQChannel
{
if (queueName == null)
{
- replyCode = virtualHost.getQueue(routingKey.toString()) ==
null
+ replyCode =
virtualHost.getAttainedQueue(routingKey.toString()) == null
? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
: ExchangeBoundOkBody.OK;
replyText = null;
}
else
{
- AMQQueue queue =
virtualHost.getQueue(queueName.toString());
+ AMQQueue queue =
virtualHost.getAttainedQueue(queueName.toString());
if (queue == null)
{
@@ -2747,7 +2747,7 @@ public class AMQChannel
}
else
{
- ExchangeImpl exchange =
virtualHost.getExchange(exchangeName.toString());
+ ExchangeImpl exchange =
virtualHost.getAttainedExchange(exchangeName.toString());
if (exchange == null)
{
@@ -2772,7 +2772,7 @@ public class AMQChannel
else
{
- AMQQueue queue =
virtualHost.getQueue(queueName.toString());
+ AMQQueue queue =
virtualHost.getAttainedQueue(queueName.toString());
if (queue == null)
{
replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
@@ -2799,7 +2799,7 @@ public class AMQChannel
}
else if (queueName != null)
{
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ AMQQueue queue =
virtualHost.getAttainedQueue(queueName.toString());
if (queue == null)
{
replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
@@ -2893,7 +2893,7 @@ public class AMQChannel
{
if (passive)
{
- exchange = virtualHost.getExchange(exchangeName.toString());
+ exchange =
virtualHost.getAttainedExchange(exchangeName.toString());
if (exchange == null)
{
closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: '"
+ exchangeName + "'");
@@ -2948,7 +2948,7 @@ public class AMQChannel
}
catch (ReservedExchangeNameException e)
{
- Exchange existing =
virtualHost.getExchange(exchangeName.toString());
+ Exchange existing =
virtualHost.getAttainedExchange(exchangeName.toString());
if (existing != null && new
AMQShortString(existing.getType()).equals(type))
{
sync();
@@ -3042,7 +3042,7 @@ public class AMQChannel
{
final String exchangeName = exchangeStr.toString();
- final ExchangeImpl exchange =
virtualHost.getExchange(exchangeName);
+ final ExchangeImpl exchange =
virtualHost.getAttainedExchange(exchangeName);
if (exchange == null)
{
closeChannel(AMQConstant.NOT_FOUND, "No such exchange: '"
+ exchangeStr + "'");
@@ -3108,7 +3108,7 @@ public class AMQChannel
}
else
{
- queue = virtualHost.getQueue(queueName.toString());
+ queue = virtualHost.getAttainedQueue(queueName.toString());
routingKey = routingKey == null ? AMQShortString.EMPTY_STRING :
routingKey.intern();
}
@@ -3130,7 +3130,7 @@ public class AMQChannel
final String exchangeName = exchange.toString();
- final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ final ExchangeImpl exch =
virtualHost.getAttainedExchange(exchangeName);
if (exch == null)
{
closeChannel(AMQConstant.NOT_FOUND,
@@ -3221,7 +3221,7 @@ public class AMQChannel
if (passive)
{
- queue = virtualHost.getQueue(queueName.toString());
+ queue = virtualHost.getAttainedQueue(queueName.toString());
if (queue == null)
{
closeChannel(AMQConstant.NOT_FOUND,
@@ -3402,7 +3402,7 @@ public class AMQChannel
}
else
{
- queue = virtualHost.getQueue(queueName.toString());
+ queue = virtualHost.getAttainedQueue(queueName.toString());
}
if (queue == null)
@@ -3465,7 +3465,7 @@ public class AMQChannel
_connection.closeConnection(AMQConstant.NOT_ALLOWED, "No queue
specified.", getChannelId());
}
- else if ((queueName != null) && (queue =
virtualHost.getQueue(queueName.toString())) == null)
+ else if ((queueName != null) && (queue =
virtualHost.getAttainedQueue(queueName.toString())) == null)
{
closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "'
does not exist.");
}
@@ -3517,7 +3517,7 @@ public class AMQChannel
final boolean useDefaultQueue = queueName == null;
final AMQQueue queue = useDefaultQueue
? getDefaultQueue()
- : virtualHost.getQueue(queueName.toString());
+ : virtualHost.getAttainedQueue(queueName.toString());
if (queue == null)
@@ -3537,7 +3537,7 @@ public class AMQChannel
else
{
- final ExchangeImpl exch =
virtualHost.getExchange(exchange.toString());
+ final ExchangeImpl exch =
virtualHost.getAttainedExchange(exchange.toString());
if (exch == null)
{
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
Tue Jul 7 21:07:31 2015
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.BasicCont
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -86,7 +87,7 @@ public class BrokerTestHelper_0_8 extend
}
else
{
- destination = channel.getVirtualHost().getExchange(exchangeName);
+ destination = (MessageDestination)
channel.getVirtualHost().getChildByName(Exchange.class, exchangeName);
}
for (int count = 0; count < numberOfMessages; count++)
{
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Tue Jul 7 21:07:31 2015
@@ -208,7 +208,7 @@ public class SendingLink_1_0 implements
name = UUID.randomUUID().toString();
}
- AMQQueue queue = _vhost.getQueue(name);
+ AMQQueue queue = _vhost.getAttainedQueue(name);
ExchangeImpl exchange = exchangeDestination.getExchange();
if(queue == null)
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Tue Jul 7 21:07:31 2015
@@ -163,7 +163,7 @@ public class Session_1_0 implements Sess
if(!addr.startsWith("/") && addr.contains("/"))
{
String[] parts = addr.split("/",2);
- ExchangeImpl exchg =
getVirtualHost().getExchange(parts[0]);
+ ExchangeImpl exchg =
getVirtualHost().getAttainedExchange(parts[0]);
if(exchg != null)
{
ExchangeDestination exchangeDestination =
@@ -180,14 +180,14 @@ public class Session_1_0 implements Sess
}
else
{
- MessageSource queue =
getVirtualHost().getMessageSource(addr);
+ MessageSource queue =
getVirtualHost().getAttainedMessageSource(addr);
if(queue != null)
{
destination = new MessageSourceDestination(queue);
}
else
{
- ExchangeImpl exchg =
getVirtualHost().getExchange(addr);
+ ExchangeImpl exchg =
getVirtualHost().getAttainedExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg,
source.getDurable(), source.getExpiryPolicy());
@@ -325,7 +325,7 @@ public class Session_1_0 implements Sess
if(!addr.startsWith("/") && addr.contains("/"))
{
String[] parts = addr.split("/",2);
- ExchangeImpl exchange =
getVirtualHost().getExchange(parts[0]);
+ ExchangeImpl exchange =
getVirtualHost().getAttainedExchange(parts[0]);
if(exchange != null)
{
ExchangeDestination exchangeDestination =
@@ -346,7 +346,7 @@ public class Session_1_0 implements Sess
}
else
{
- MessageDestination messageDestination =
getVirtualHost().getMessageDestination(addr);
+ MessageDestination messageDestination =
getVirtualHost().getAttainedMessageDestination(addr);
if(messageDestination != null)
{
destination = new
NodeReceivingDestination(messageDestination, target.getDurable(),
@@ -354,7 +354,7 @@ public class Session_1_0 implements Sess
}
else
{
- AMQQueue queue =
getVirtualHost().getQueue(addr);
+ AMQQueue queue =
getVirtualHost().getAttainedQueue(addr);
if(queue != null)
{
Modified:
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1689742&r1=1689741&r2=1689742&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
Tue Jul 7 21:07:31 2015
@@ -161,7 +161,7 @@ public class MessageConverter_1_0_to_v0_
String[] parts = origReplyTo.split("/",2);
replyTo = new ReplyTo(parts[0],parts[1]);
}
- else if(vhost.getExchange(origReplyTo) != null)
+ else if(vhost.getAttainedExchange(origReplyTo) != null)
{
replyTo = new ReplyTo(origReplyTo,"");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]