Author: rgodfrey
Date: Fri May 1 23:18:23 2015
New Revision: 1677253
URL: http://svn.apache.org/r1677253
Log:
QPID-6525 : Use a system node to provide per-virtual host properties which we
can then use to set the temporary queue prefix on a per vhost basis
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
(with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
(with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeCreator.java
(with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/java/trunk/pom.xml
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
Fri May 1 23:18:23 2015
@@ -185,6 +185,8 @@ public interface Broker<X extends Broker
void setEventLogger(EventLogger eventLogger);
+ boolean isVirtualHostPropertiesNodeEnabled();
+
AuthenticationProvider<?> getManagementModeAuthenticationProvider();
void assignTargetSizes();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
Fri May 1 23:18:23 2015
@@ -49,11 +49,14 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.plugin.ConfigurationSecretEncrypterFactory;
import org.apache.qpid.server.plugin.PluggableFactoryLoader;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.security.SecurityManager;
import
org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
import org.apache.qpid.util.SystemUtils;
public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter>
implements Broker<BrokerAdapter>, StatisticsGatherer
@@ -100,6 +103,7 @@ public class BrokerAdapter extends Abstr
@ManagedAttributeField
private String _confidentialConfigurationEncryptionProvider;
+ private final boolean _virtualHostPropertiesNodeEnabled;
@ManagedObjectFactoryConstructor
public BrokerAdapter(Map<String, Object> attributes,
@@ -119,6 +123,11 @@ public class BrokerAdapter extends Abstr
authManager.addUser(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
_parent.getManagementModePassword());
_managementModeAuthenticationProvider = authManager;
}
+
+ QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
+ final Set<String> systemNodeCreatorTypes =
qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
+ _virtualHostPropertiesNodeEnabled =
systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
+
_messagesDelivered = new StatisticsCounter("messages-delivered");
_dataDelivered = new StatisticsCounter("bytes-delivered");
_messagesReceived = new StatisticsCounter("messages-received");
@@ -776,7 +785,11 @@ public class BrokerAdapter extends Abstr
}
}
-
+ @Override
+ public boolean isVirtualHostPropertiesNodeEnabled()
+ {
+ return _virtualHostPropertiesNodeEnabled;
+ }
public AuthenticationProvider<?> getManagementModeAuthenticationProvider()
{
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1677253&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
Fri May 1 23:18:23 2015
@@ -0,0 +1,555 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
+
+public abstract class AbstractSystemMessageSource implements MessageSource
+{
+ protected final UUID _id;
+ protected final String _name;
+ protected final VirtualHost<?, ?, ?> _virtualHost;
+ private final CopyOnWriteArrayList<ConsumerRegistrationListener<? super
MessageSource>>
+ _consumerRegistrationListeners =
+ new CopyOnWriteArrayList<>();
+ private Map<String, Consumer> _consumers = new ConcurrentHashMap<>();
+
+ public AbstractSystemMessageSource(
+ String name, final VirtualHostImpl virtualHost)
+ {
+ _name = name;
+ _id = UUID.nameUUIDFromBytes((getClass().getSimpleName() + "/" +
virtualHost.getName() + "/" + name).getBytes(
+ StandardCharsets.UTF_8));
+ _virtualHost = virtualHost;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _name;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.NEVER;
+ }
+
+ protected abstract InternalMessage createMessage();
+
+ @Override
+ public Consumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage>
messageClass,
+ final String consumerName,
+ final EnumSet<ConsumerImpl.Option> options)
+ throws ExistingExclusiveConsumer,
ExistingConsumerPreventsExclusive,
+ ConsumerAccessRefused
+ {
+ final Consumer consumer = new Consumer(consumerName, target);
+ target.consumerAdded(consumer);
+ _consumers.put(consumerName, consumer);
+ for (ConsumerRegistrationListener<? super MessageSource> listener :
_consumerRegistrationListeners)
+ {
+ listener.consumerAdded(this, consumer);
+ }
+ return consumer;
+ }
+
+ @Override
+ public Collection<? extends ConsumerImpl> getConsumers()
+ {
+ return new ArrayList<>(_consumers.values());
+ }
+
+ @Override
+ public void addConsumerRegistrationListener(final
ConsumerRegistrationListener<? super MessageSource> listener)
+ {
+ _consumerRegistrationListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerRegistrationListener(final
ConsumerRegistrationListener<? super MessageSource> listener)
+ {
+ _consumerRegistrationListeners.remove(listener);
+ }
+
+ @Override
+ public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+ {
+ return true;
+ }
+
+ class Consumer implements ConsumerImpl
+ {
+
+ private final long _id =
ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
+ private final List<PropertiesMessageInstance> _queue =
+ Collections.synchronizedList(new
ArrayList<PropertiesMessageInstance>());
+ private final ConsumerTarget _target;
+ private final String _name;
+ private final StateChangeListener<ConsumerTarget,
ConsumerTarget.State> _targetChangeListener =
+ new Consumer.TargetChangeListener();
+
+
+ public Consumer(final String consumerName, ConsumerTarget target)
+ {
+ _name = consumerName;
+ _target = target;
+ target.addStateListener(_targetChangeListener);
+ }
+
+ @Override
+ public void externalStateChange()
+ {
+
+ }
+
+ @Override
+ public ConsumerTarget getTarget()
+ {
+ return _target;
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQSessionModel getSessionModel()
+ {
+ return _target.getSessionModel();
+ }
+
+ @Override
+ public MessageSource getMessageSource()
+ {
+ return AbstractSystemMessageSource.this;
+ }
+
+ @Override
+ public long getConsumerNumber()
+ {
+ return _id;
+ }
+
+ @Override
+ public boolean isSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean seesRequeues()
+ {
+ return false;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public boolean trySendLock()
+ {
+ return _target.trySendLock();
+ }
+
+ @Override
+ public void getSendLock()
+ {
+ _target.getSendLock();
+ }
+
+ @Override
+ public void releaseSendLock()
+ {
+ _target.releaseSendLock();
+ }
+
+ @Override
+ public boolean isActive()
+ {
+ return false;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _name;
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+
+ void send(final InternalMessage response)
+ {
+ _target.getSendLock();
+ try
+ {
+ final PropertiesMessageInstance
+ responseEntry = new PropertiesMessageInstance(this,
response);
+ if (_queue.isEmpty() && _target.allocateCredit(response))
+ {
+ _target.send(this, responseEntry, false);
+ }
+ else
+ {
+ _queue.add(responseEntry);
+ }
+ }
+ finally
+ {
+ _target.releaseSendLock();
+ }
+ }
+
+ private class TargetChangeListener implements
StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ if (newState == ConsumerTarget.State.ACTIVE)
+ {
+ deliverMessages();
+ }
+ }
+ }
+
+ private void deliverMessages()
+ {
+ _target.getSendLock();
+ try
+ {
+ while (!_queue.isEmpty())
+ {
+
+ final PropertiesMessageInstance propertiesMessageInstance
= _queue.get(0);
+ if (!_target.isSuspended() &&
_target.allocateCredit(propertiesMessageInstance.getMessage()))
+ {
+ _queue.remove(0);
+ _target.send(this, propertiesMessageInstance, false);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ finally
+ {
+ _target.releaseSendLock();
+ }
+ }
+
+ }
+
+ class PropertiesMessageInstance implements MessageInstance
+ {
+ private final Consumer _consumer;
+ private int _deliveryCount;
+ private boolean _isRedelivered;
+ private boolean _isDelivered;
+ private boolean _isDeleted;
+ private InternalMessage _message;
+
+ PropertiesMessageInstance(final Consumer consumer, final
InternalMessage message)
+ {
+ _consumer = consumer;
+ _message = message;
+ }
+
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ _deliveryCount++;
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ _deliveryCount--;
+ }
+
+ @Override
+ public void addStateChangeListener(final StateChangeListener<? super
MessageInstance, State> listener)
+ {
+
+ }
+
+ @Override
+ public boolean removeStateChangeListener(final StateChangeListener<?
super MessageInstance, State> listener)
+ {
+ return false;
+ }
+
+
+ @Override
+ public boolean acquiredByConsumer()
+ {
+ return !isDeleted();
+ }
+
+ @Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return _consumer;
+ }
+
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isAcquiredBy(final ConsumerImpl consumer)
+ {
+ return consumer == _consumer && !isDeleted();
+ }
+
+ @Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl
consumer)
+ {
+ return consumer == _consumer;
+ }
+
+ @Override
+ public void setRedelivered()
+ {
+ _isRedelivered = true;
+ }
+
+ @Override
+ public boolean isRedelivered()
+ {
+ return _isRedelivered;
+ }
+
+ @Override
+ public Consumer getDeliveredConsumer()
+ {
+ return isDeleted() ? null : _consumer;
+ }
+
+ @Override
+ public void reject()
+ {
+ delete();
+ }
+
+ @Override
+ public boolean isRejectedBy(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean getDeliveredToConsumer()
+ {
+ return _isDelivered;
+ }
+
+ @Override
+ public boolean expired()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquire(final ConsumerImpl sub)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int routeToAlternate(final Action<? super MessageInstance>
action,
+ final ServerTransaction txn)
+ {
+ return 0;
+ }
+
+
+ @Override
+ public Filterable asFilterable()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isAvailable()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquire()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isAcquired()
+ {
+ return !isDeleted();
+ }
+
+ @Override
+ public void release()
+ {
+ delete();
+ }
+
+ @Override
+ public boolean resend()
+ {
+ return false;
+ }
+
+ @Override
+ public void delete()
+ {
+ _isDeleted = true;
+ }
+
+ @Override
+ public boolean isDeleted()
+ {
+ return _isDeleted;
+ }
+
+ @Override
+ public ServerMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public InstanceProperties getInstanceProperties()
+ {
+ return InstanceProperties.EMPTY;
+ }
+
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return AbstractSystemMessageSource.this;
+ }
+
+ }
+}
Propchange:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1677253&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
Fri May 1 23:18:23 2015
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+
+public class VirtualHostPropertiesNode extends AbstractSystemMessageSource
+{
+
+ public VirtualHostPropertiesNode(final VirtualHostImpl virtualHost)
+ {
+ this(virtualHost, "$virtualhostProperties");
+ }
+ public VirtualHostPropertiesNode(final VirtualHostImpl virtualHost, String
name)
+ {
+ super(name, virtualHost);
+ }
+
+ @Override
+ public Consumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage>
messageClass,
+ final String consumerName,
+ final EnumSet<ConsumerImpl.Option> options)
+ throws ExistingExclusiveConsumer,
ExistingConsumerPreventsExclusive,
+ ConsumerAccessRefused
+ {
+ final Consumer consumer = super.addConsumer(target, filters,
messageClass, consumerName, options);
+ consumer.send(createMessage());
+ target.queueEmpty();
+ return consumer;
+ }
+
+ @Override
+ protected InternalMessage createMessage()
+ {
+
+ Map<String, Object> headers = new HashMap<>();
+
+ final List<String> globalAddresseDomains =
_virtualHost.getGlobalAddressDomains();
+ if (globalAddresseDomains != null && !globalAddresseDomains.isEmpty())
+ {
+ String primaryDomain = globalAddresseDomains.get(0);
+ if(primaryDomain != null)
+ {
+ primaryDomain = primaryDomain.trim();
+ if(!primaryDomain.endsWith("/"))
+ {
+ primaryDomain += "/";
+ }
+ headers.put("virtualHost.temporaryQueuePrefix", primaryDomain);
+ }
+ }
+
+ InternalMessageHeader header = new InternalMessageHeader(headers,
+ null, 0l,
null, null, UUID.randomUUID().toString(),
+ null, null,
(byte) 4, System.currentTimeMillis(),
+ null, null);
+ final InternalMessage message =
+
InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), header, new
byte[0]);
+ message.setInitialRoutingAddress(getName());
+ return message;
+ }
+
+
+}
Propchange:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeCreator.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeCreator.java?rev=1677253&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeCreator.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeCreator.java
Fri May 1 23:18:23 2015
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
+
+@PluggableService
+public class VirtualHostPropertiesNodeCreator implements SystemNodeCreator
+{
+
+ public static final String TYPE = "VIRTUALHOSTPROPERTIES";
+
+ @Override
+ public void register(final SystemNodeRegistry registry)
+ {
+ registry.registerSystemNode(new
VirtualHostPropertiesNode(registry.getVirtualHost()));
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+}
Propchange:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeCreator.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Fri May 1 23:18:23 2015
@@ -113,33 +113,12 @@ public class ServerConnectionDelegate ex
map.put(ServerPropertyNames.VERSION,
QpidProperties.getReleaseVersion());
map.put(ServerPropertyNames.QPID_BUILD,
QpidProperties.getBuildVersion());
map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
- map.put(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
getTemporaryQueuePrefix(broker));
map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
String.valueOf(broker.isMessageCompressionEnabled()));
+
map.put(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED,
String.valueOf(broker.isVirtualHostPropertiesNodeEnabled()));
return map;
}
- private static String getTemporaryQueuePrefix(final Broker<?> broker)
- {
- String prefix = "";
- if
(broker.getContextKeys(false).contains(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX))
- {
- prefix = broker.getContextValue(String.class,
ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
- }
- if (prefix != null)
- {
- if (prefix.length() > 0 && !prefix.endsWith("/"))
- {
- prefix += "/";
- }
- }
- else
- {
- prefix = "";
- }
- return prefix;
- }
-
public ServerSession getSession(Connection conn, SessionAttach atc)
{
SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
Fri May 1 23:18:23 2015
@@ -1164,7 +1164,8 @@ public class ServerSessionDelegate exten
ExchangeBoundResult result = new ExchangeBoundResult();
VirtualHostImpl virtualHost = getVirtualHost(session);
ExchangeImpl exchange;
- AMQQueue queue;
+ MessageSource source;
+ AMQQueue<?> queue;
boolean isDefaultExchange;
if(!nameNullOrEmpty(method.getExchange()))
{
@@ -1221,90 +1222,96 @@ public class ServerSessionDelegate exten
}
else if(method.hasQueue())
{
+ source = getMessageSource(session, method.getQueue());
- queue = getQueue(session, method.getQueue());
- if(queue == null)
+ if(source == null)
{
result.setQueueNotFound(true);
}
-
-
- if(exchange != null && queue != null)
+ if(source == null || source instanceof AMQQueue)
{
+ queue = (AMQQueue<?>) source;
- boolean queueMatched = exchange.isBound(queue);
+ if (exchange != null && queue != null)
+ {
- result.setQueueNotMatched(!queueMatched);
+ boolean queueMatched = exchange.isBound(queue);
+ result.setQueueNotMatched(!queueMatched);
- if(method.hasBindingKey())
- {
- if(queueMatched)
+ if (method.hasBindingKey())
{
- final boolean keyMatched =
exchange.isBound(method.getBindingKey(), queue);
- result.setKeyNotMatched(!keyMatched);
- if(method.hasArguments())
+
+ if (queueMatched)
{
- if(keyMatched)
+ final boolean keyMatched =
exchange.isBound(method.getBindingKey(), queue);
+ result.setKeyNotMatched(!keyMatched);
+ if (method.hasArguments())
{
-
result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(),
method.getArguments(), queue));
+ if (keyMatched)
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(),
+
method.getArguments(),
+
queue));
+ }
+ else
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
+ }
}
- else
+ }
+ else
+ {
+ boolean keyMatched =
exchange.isBound(method.getBindingKey());
+ result.setKeyNotMatched(!keyMatched);
+ if (method.hasArguments())
{
-
result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
+ if (keyMatched)
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(),
+
method.getArguments()));
+ }
+ else
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
+ }
}
}
+
}
- else
+ else if (method.hasArguments())
{
- boolean keyMatched =
exchange.isBound(method.getBindingKey());
- result.setKeyNotMatched(!keyMatched);
- if(method.hasArguments())
+ if (queueMatched)
{
- if(keyMatched)
- {
-
result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(),
method.getArguments()));
- }
- else
- {
-
result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
+
result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
+ }
+ else
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
}
}
}
- else if (method.hasArguments())
+ else if (exchange != null && method.hasBindingKey())
{
- if(queueMatched)
- {
-
result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
- }
- else
- {
-
result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
- }
- }
-
- }
- else if(exchange != null && method.hasBindingKey())
- {
- final boolean keyMatched =
exchange.isBound(method.getBindingKey());
- result.setKeyNotMatched(!keyMatched);
+ final boolean keyMatched =
exchange.isBound(method.getBindingKey());
+ result.setKeyNotMatched(!keyMatched);
- if(method.hasArguments())
- {
- if(keyMatched)
- {
-
result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(),
method.getArguments()));
- }
- else
+ if (method.hasArguments())
{
-
result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
+ if (keyMatched)
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(),
method.getArguments()));
+ }
+ else
+ {
+
result.setArgsNotMatched(!exchange.isBound(method.getArguments()));
+ }
}
- }
+ }
}
}
@@ -1337,6 +1344,11 @@ public class ServerSessionDelegate exten
}
+ private MessageSource getMessageSource(Session session, String queue)
+ {
+ return getVirtualHost(session).getMessageSource(queue);
+ }
+
private AMQQueue getQueue(Session session, String queue)
{
return getVirtualHost(session).getQueue(queue);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Fri May 1 23:18:23 2015
@@ -265,7 +265,7 @@ public class AMQChannel
}
- private boolean performGet(final AMQQueue queue,
+ private boolean performGet(final MessageSource queue,
final boolean acks)
throws MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer,
MessageSource.ConsumerAccessRefused
@@ -1463,11 +1463,11 @@ public class AMQChannel
{
private final FlowCreditManager _singleMessageCredit;
- private final AMQQueue _queue;
+ private final MessageSource _queue;
private boolean _deliveredMessage;
public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
- final AMQQueue queue)
+ final MessageSource queue)
{
_singleMessageCredit = singleMessageCredit;
_queue = queue;
@@ -1478,11 +1478,12 @@ public class AMQChannel
final InstanceProperties props, final long
deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
+ int queueSize = _queue instanceof AMQQueue ?
((AMQQueue)_queue).getQueueDepthMessages() : 0;
long size =
_connection.getProtocolOutputConverter().writeGetOk(message,
props,
AMQChannel.this.getChannelId(),
deliveryTag,
-
_queue.getQueueDepthMessages());
+
queueSize);
_deliveredMessage = true;
return size;
@@ -2074,7 +2075,7 @@ public class AMQChannel
sync();
String queueName = queue == null ? null : queue.asString();
- MessageSource queue1 = queueName == null ? getDefaultQueue() :
vHost.getQueue(queueName);
+ MessageSource queue1 = queueName == null ? getDefaultQueue() :
vHost.getMessageSource(queueName);
final Collection<MessageSource> sources = new HashSet<>();
if (queue1 != null)
{
@@ -2197,7 +2198,7 @@ public class AMQChannel
VirtualHostImpl vHost = _connection.getVirtualHost();
sync();
- AMQQueue queue = queueName == null ? getDefaultQueue() :
vHost.getQueue(queueName.toString());
+ MessageSource queue = queueName == null ? getDefaultQueue() :
vHost.getMessageSource(queueName.toString());
if (queue == null)
{
_logger.info("No queue for '" + queueName + "'");
@@ -2657,7 +2658,7 @@ public class AMQChannel
}
else
{
- AMQQueue queue =
virtualHost.getQueue(queueName.toString());
+ MessageSource queue =
virtualHost.getMessageSource(queueName.toString());
if (queue == null)
{
replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Fri May 1 23:18:23 2015
@@ -509,13 +509,13 @@ public class AMQProtocolEngine implement
QpidProperties.getBuildVersion());
serverProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME,
_broker.getName());
-
serverProperties.setString(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
- getTemporaryQueuePrefix());
serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE,
String.valueOf(_closeWhenNoRoute));
serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
String.valueOf(_broker.isMessageCompressionEnabled()));
serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED,
Boolean.TRUE.toString());
+
serverProperties.setString(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED,
String.valueOf(_broker.isVirtualHostPropertiesNodeEnabled()));
+
AMQMethodBody responseBody =
getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -537,27 +537,6 @@ public class AMQProtocolEngine implement
}
}
- private String getTemporaryQueuePrefix()
- {
- String prefix = "";
- if
(_broker.getContextKeys(false).contains(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX))
- {
- prefix = _broker.getContextValue(String.class,
ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
- }
- if (prefix != null)
- {
- if (prefix.length() > 0 && !prefix.endsWith("/"))
- {
- prefix += "/";
- }
- }
- else
- {
- prefix = "";
- }
- return prefix;
- }
-
public synchronized void writeFrame(AMQDataBlock frame)
{
if(_logger.isDebugEnabled())
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Fri May 1 23:18:23 2015
@@ -26,9 +26,13 @@ import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -41,6 +45,8 @@ import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
@@ -78,6 +84,7 @@ import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
@@ -212,6 +219,9 @@ public class AMQConnection extends Close
private boolean _compressMessages;
private int _messageCompressionThresholdSize;
+ private final Map<String, String> _virtualHostProperties = new HashMap<>();
+ private volatile boolean _virtualHostPropertiesPopulated;
+
static
{
if (_logger.isDebugEnabled())
@@ -735,10 +745,47 @@ public class AMQConnection extends Close
synchronized (_sessionCreationLock)
{
checkNotClosed();
+
+ if(_delegate.isVirtualHostPropertiesSupported() &&
!_virtualHostPropertiesPopulated)
+ {
+ retrieveVirtualHostPropertiesIfNecessary();
+ }
return _delegate.createSession(transacted, acknowledgeMode,
prefetchHigh, prefetchLow);
}
}
+ private void retrieveVirtualHostPropertiesIfNecessary() throws JMSException
+ {
+ synchronized (_virtualHostProperties)
+ {
+ if(!_virtualHostPropertiesPopulated)
+ {
+ final Session session = _delegate.createSession(false,
AMQSession.NO_ACKNOWLEDGE, 1,1);
+ final MessageConsumer consumer =
session.createConsumer(session.createQueue(
+ "ADDR: $virtualhostProperties; {assert: never, create:
never, node:{ type: queue }}"));
+ try
+ {
+ ((AMQSession)session).start();
+ }
+ catch (AMQException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new
JMSException(
+ "Failed to retrieve virtual host properties"), e);
+ }
+ Message propertiesMessage =
consumer.receive(getProtocolHandler().getDefaultTimeout());
+ if(propertiesMessage != null)
+ {
+ for(String property :
Collections.list((Enumeration<String>) propertiesMessage.getPropertyNames()))
+ {
+ _virtualHostProperties.put(property,
propertiesMessage.getStringProperty(property));
+ }
+ }
+ session.close();
+ _virtualHostPropertiesPopulated = true;
+ }
+ }
+ }
+
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -1690,6 +1737,21 @@ public class AMQConnection extends Close
public String getTemporaryQueuePrefix()
{
- return _delegate.getTemporaryQueuePrefix();
+ if(_delegate.isVirtualHostPropertiesSupported())
+ {
+ final String prefix =
getVirtualHostProperty("virtualHost.temporaryQueuePrefix");
+ return prefix == null ? "" : prefix;
+ }
+ else
+ {
+ return "";
+ }
+
}
+
+ String getVirtualHostProperty(final String propertyName)
+ {
+ return _virtualHostProperties.get(propertyName);
+ }
+
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
Fri May 1 23:18:23 2015
@@ -84,5 +84,7 @@ public interface AMQConnectionDelegate
boolean isMessageCompressionSupported();
- String getTemporaryQueuePrefix();
+ boolean isVirtualHostPropertiesSupported();
+
+
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Fri May 1 23:18:23 2015
@@ -606,11 +606,9 @@ public class AMQConnectionDelegate_0_10
}
@Override
- public String getTemporaryQueuePrefix()
+ public boolean isVirtualHostPropertiesSupported()
{
- final Map<String, Object> serverProperties =
_qpidConnection.getServerProperties();
- String temporaryQueuePrefix = (String)
serverProperties.get(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
- return (temporaryQueuePrefix == null ? "" : temporaryQueuePrefix);
+ return _qpidConnection.isVirtualHostPropertiesSupported();
}
private class RedirectConnectionException extends ConnectionException
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Fri May 1 23:18:23 2015
@@ -76,6 +76,7 @@ public class AMQConnectionDelegate_8_0 i
private boolean _addrSyntaxSupported;
private boolean _confirmedPublishSupported;
private boolean _confirmedPublishNonTransactionalSupported;
+ private boolean _virtualhostPropertiesSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -163,8 +164,15 @@ public class AMQConnectionDelegate_8_0 i
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
_conn.logConnected(network.getLocalAddress(),
network.getRemoteAddress());
- _messageCompressionSupported =
checkMessageCompressionSupported();
- _confirmedPublishSupported = checkConfirmedPublishSupported();
+
+ _messageCompressionSupported =
+
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
+
+ _virtualhostPropertiesSupported =
+
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED);
+
+ _confirmedPublishSupported =
+
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
_confirmedPublishNonTransactionalSupported =
checkConfirmedPublishNonTransactionalSupported();
return null;
}
@@ -319,7 +327,7 @@ public class AMQConnectionDelegate_8_0 i
throws AMQException, FailoverException
{
ChannelOpenBody channelOpenBody =
_conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
-
_conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId),
ChannelOpenOkBody.class);
+
_conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId),
ChannelOpenOkBody.class);
if (transacted)
{
@@ -486,20 +494,11 @@ public class AMQConnectionDelegate_8_0 i
return connectedToQpid;
}
- private boolean checkMessageCompressionSupported()
- {
- FieldTable serverProperties =
_conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
- return serverProperties != null
- &&
Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED));
-
- }
-
- private boolean checkConfirmedPublishSupported()
+ private boolean checkBooleanConnectionStartProperty(final String property)
{
FieldTable serverProperties =
_conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
return serverProperties != null
- &&
Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED));
-
+ && Boolean.parseBoolean(serverProperties.getString(property));
}
public boolean isMessageCompressionSupported()
@@ -508,13 +507,12 @@ public class AMQConnectionDelegate_8_0 i
}
@Override
- public String getTemporaryQueuePrefix()
+ public boolean isVirtualHostPropertiesSupported()
{
- FieldTable serverProperties =
_conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
- String temporaryQueuePrefix =
serverProperties.getString(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
- return (temporaryQueuePrefix == null ? "" : temporaryQueuePrefix);
+ return _virtualhostPropertiesSupported;
}
+
public boolean isAddrSyntaxSupported()
{
return _addrSyntaxSupported;
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Fri May 1 23:18:23 2015
@@ -327,8 +327,8 @@ public abstract class AMQSession<C exten
protected AMQSession(AMQConnection con, int channelId, boolean transacted,
int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
- _useAMQPEncodedMapMessage = con == null ? true :
!con.isUseLegacyMapMessageFormat();
- _useAMQPEncodedStreamMessage = con == null ? false :
!con.isUseLegacyStreamMessageFormat();
+ _useAMQPEncodedMapMessage = con == null ||
!con.isUseLegacyMapMessageFormat();
+ _useAMQPEncodedStreamMessage = con != null &&
!con.isUseLegacyStreamMessageFormat();
_strictAMQP =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP,
STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL,
STRICT_AMQP_FATAL_DEFAULT));
@@ -443,7 +443,12 @@ public abstract class AMQSession<C exten
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
{
- this(con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+ this(con,
+ channelId,
+ transacted,
+ acknowledgeMode,
+ MessageFactoryRegistry.newDefaultRegistry(),
+ defaultPrefetchHigh,
defaultPrefetchLow);
}
@@ -3134,7 +3139,9 @@ public abstract class AMQSession<C exten
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.debug(MessageFormat.format("Resubscribing producers = {0}
producers.size={1}", producers, producers.size())); // FIXME: removeKey
+ _logger.debug(MessageFormat.format("Resubscribing producers = {0}
producers.size={1}",
+ producers,
+ producers.size())); // FIXME:
removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
P producer = (P) it.next();
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Fri May 1 23:18:23 2015
@@ -56,6 +56,7 @@ import org.apache.qpid.client.messaging.
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Fri May 1 23:18:23 2015
@@ -961,4 +961,9 @@ public class AMQProtocolHandler implemen
_decoder = new ClientDecoder(_protocolSession.getMethodProcessor());
_protocolSession.init(settings);
}
+
+ public long getDefaultTimeout()
+ {
+ return DEFAULT_SYNC_TIMEOUT;
+ }
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
Fri May 1 23:18:23 2015
@@ -40,6 +40,7 @@ public class ConnectionStartProperties
public static final String QPID_MESSAGE_COMPRESSION_SUPPORTED =
"qpid.message_compression_supported";
+ public static final String QPID_VIRTUALHOST_PROPERTIES_SUPPORTED =
"qpid.virtualhost_properties_supported";
public static final String CLIENT_ID_0_10 = "clientName";
public static final String CLIENT_ID_0_8 = "instance";
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
Fri May 1 23:18:23 2015
@@ -81,6 +81,7 @@ public class Connection extends Connecti
private FrameSizeObserver _frameSizeObserver;
private boolean _messageCompressionSupported;
private final AtomicBoolean _redirecting = new AtomicBoolean();
+ private boolean _virtualHostPropertiesSupported;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD,
RESUMING }
@@ -701,6 +702,8 @@ public class Connection extends Connecti
{
_serverProperties = serverProperties == null ? Collections.<String,
Object>emptyMap() : serverProperties;
_messageCompressionSupported =
Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
+ _virtualHostPropertiesSupported =
Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED)));
+
}
public Map<String, Object> getServerProperties()
@@ -860,6 +863,11 @@ public class Connection extends Connecti
return _messageCompressionSupported;
}
+ public boolean isVirtualHostPropertiesSupported()
+ {
+ return _virtualHostPropertiesSupported;
+ }
+
public boolean isRedirecting()
{
return _redirecting.get();
Modified: qpid/java/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Fri May 1 23:18:23 2015
@@ -251,6 +251,7 @@
<virtualhostnode.type>${profile.virtualhostnode.type}</virtualhostnode.type>
<virtualhostnode.context.blueprint>${profile.virtualhostnode.context.blueprint}</virtualhostnode.context.blueprint>
<broker.clean.between.tests>${profile.broker.clean.between.tests}</broker.clean.between.tests>
+ <qpid.globalAddressDomains>[]</qpid.globalAddressDomains>
<!-- This must be a child of qpid home currently due to the
horrible mechanics of QBTC -->
<test.output>${qpid.home.qbtc.output}</test.output>
@@ -409,7 +410,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -428,7 +429,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -447,7 +448,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -466,7 +467,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -485,7 +486,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -504,7 +505,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -523,7 +524,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -542,7 +543,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -561,7 +562,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -580,7 +581,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
<profile.test_receive_timeout>2000</profile.test_receive_timeout>
</properties>
</profile>
@@ -600,7 +601,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
<profile.test_receive_timeout>2000</profile.test_receive_timeout>
</properties>
</profile>
@@ -620,7 +621,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
<profile.test_receive_timeout>2000</profile.test_receive_timeout>
</properties>
</profile>
@@ -645,7 +646,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"DERBY","storePath":"${dollar.sign}{json:QPID_WORK}${dollar.sign}{json:file.separator}${dollar.sign}{this:name}${dollar.sign}{json:file.separator}derby"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"DERBY","storePath":"${dollar.sign}{json:QPID_WORK}${dollar.sign}{json:file.separator}${dollar.sign}{this:name}${dollar.sign}{json:file.separator}derby","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
@@ -664,7 +665,7 @@
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
-
<profile.virtualhostnode.context.blueprint>{"type":"DERBY","storePath":"${dollar.sign}{QPID_WORK}/${dollar.sign}{this:name}/derby"}</profile.virtualhostnode.context.blueprint>
+
<profile.virtualhostnode.context.blueprint>{"type":"DERBY","storePath":"${dollar.sign}{json:QPID_WORK}${dollar.sign}{json:file.separator}${dollar.sign}{this:name}${dollar.sign}{json:file.separator}derby","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
</properties>
</profile>
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
(original)
+++
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
Fri May 1 23:18:23 2015
@@ -1286,6 +1286,8 @@ public class QpidBrokerTestCase extends
copySystemProperty("test.port.alt.ssl", jvmOptions);
copySystemProperty("test.amqp_port_protocols", jvmOptions);
+ copySystemProperty("qpid.globalAddressDomains", jvmOptions);
+
copySystemProperty("virtualhostnode.type", jvmOptions);
copySystemProperty("virtualhostnode.context.blueprint",
jvmOptions);
}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
Fri May 1 23:18:23 2015
@@ -29,8 +29,15 @@ import javax.jms.TemporaryQueue;
public class TemporaryQueuePrefixTest extends QpidBrokerTestCase
{
+ @Override
+ public void setUp() throws Exception
+ {
+ // deliberately don't call setup
+ }
+
public void testNoPrefixSet() throws Exception
{
+ super.setUp();
Connection connection = getConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
@@ -41,8 +48,9 @@ public class TemporaryQueuePrefixTest ex
public void testEmptyPrefix() throws Exception
{
- String prefix = "";
- setTestSystemProperty(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
prefix);
+ String prefix = "[]";
+ setTestSystemProperty("qpid.globalAddressDomains", prefix);
+ super.setUp();
Connection connection = getConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
@@ -51,23 +59,26 @@ public class TemporaryQueuePrefixTest ex
connection.close();
}
- public void testPrefixWithSlash() throws Exception
+ public void testTwoDomains() throws Exception
{
- String prefix = "testPrefix/";
- setTestSystemProperty(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
prefix);
+ final String primaryPrefix = "/testPrefix";
+ String prefix = "[ \\\"" + primaryPrefix + "\\\", \\\"/foo\\\" ]";
+ setTestSystemProperty("qpid.globalAddressDomains", prefix);
+ super.setUp();
Connection connection = getConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertFalse(queue.getQueueName() + " has superfluous slash in
prefix.", queue.getQueueName().startsWith(prefix + "/"));
- assertTrue(queue.getQueueName() + " does not start with expected
prefix \"" + prefix + "\".", queue.getQueueName().startsWith(prefix));
+ assertTrue(queue.getQueueName() + " does not start with expected
prefix \"" + primaryPrefix + "\".",
queue.getQueueName().startsWith(primaryPrefix));
connection.close();
}
public void testPrefix() throws Exception
{
- String prefix = "testPrefix";
- setTestSystemProperty(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
prefix);
+ String prefix = "/testPrefix";
+ setTestSystemProperty("qpid.globalAddressDomains", prefix);
+ super.setUp();
Connection connection = getConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
Fri May 1 23:18:23 2015
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
@@ -37,8 +38,15 @@ public class ChannelLoggingTest extends
private static final String CHANNEL_CLOSE_FORCED_MESSAGE_PATTERN =
"CHN-1003 : Close : \\d* - .*";
private static final String CHANNEL_PREFIX = "CHN-";
- // No explicit startup configuration is required for this test
- // so no setUp() method
+ @Override
+ public void setUp() throws Exception
+ {
+
+ // disable the virtualhostPropertiesNode as it messes with the logging
since it causes the client to
+ // create a session and then it sends a message
+ setTestSystemProperty("qpid.plugin.disabled:systemnodecreator."+
VirtualHostPropertiesNodeCreator.TYPE, "true");
+ super.setUp();
+ }
/**
* Description:
@@ -365,4 +373,4 @@ public class ChannelLoggingTest extends
assertEquals("Channel IDs should be the same",
getChannelID(fromActor(open)), getChannelID(fromSubject(close)));
assertEquals("Connection IDs should be the same",
getConnectionID(fromActor(open)), getConnectionID(fromSubject(close)));
}
-}
\ No newline at end of file
+}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java
Fri May 1 23:18:23 2015
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQSession
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.ServerInformation;
+import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -53,6 +54,9 @@ public class StatisticsTest extends Qpid
@Override
public void setUp() throws Exception
{
+ // disable the virtualhostPropertiesNode as it messes with the
statistics counts since causes the client to
+ // create a session and then it sends a message
+ setTestSystemProperty("qpid.plugin.disabled:systemnodecreator."+
VirtualHostPropertiesNodeCreator.TYPE, "true");
createTestVirtualHostNode(0, TEST_VIRTUALHOST1);
createTestVirtualHostNode(0, TEST_VIRTUALHOST2);
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java?rev=1677253&r1=1677252&r2=1677253&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
Fri May 1 23:18:23 2015
@@ -37,6 +37,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
public class ConnectionRestTest extends QpidRestTestCase
{
@@ -53,6 +54,10 @@ public class ConnectionRestTest extends
public void setUp() throws Exception
{
+ // disable the virtualhostPropertiesNode as it messes with the
statistics counts since causes the client to
+ // create a session and then it sends a message
+ setTestSystemProperty("qpid.plugin.disabled:systemnodecreator."+
VirtualHostPropertiesNodeCreator.TYPE, "true");
+
super.setUp();
_connection = getConnection();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]