Author: rgodfrey
Date: Fri May 1 10:24:39 2015
New Revision: 1677106
URL: http://svn.apache.org/r1677106
Log:
QPID-6525 : Add connection property specifying a temporary queue prefix
(applied patch from Lorenz Quack)
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
(with props)
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Fri May 1 10:24:39 2015
@@ -113,11 +113,33 @@ public class ServerConnectionDelegate ex
map.put(ServerPropertyNames.VERSION,
QpidProperties.getReleaseVersion());
map.put(ServerPropertyNames.QPID_BUILD,
QpidProperties.getBuildVersion());
map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
+ map.put(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
getTemporaryQueuePrefix(broker));
map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
String.valueOf(broker.isMessageCompressionEnabled()));
return map;
}
+ private static String getTemporaryQueuePrefix(final Broker<?> broker)
+ {
+ String prefix = "";
+ if
(broker.getContextKeys(false).contains(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX))
+ {
+ prefix = broker.getContextValue(String.class,
ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
+ }
+ if (prefix != null)
+ {
+ if (prefix.length() > 0 && !prefix.endsWith("/"))
+ {
+ prefix += "/";
+ }
+ }
+ else
+ {
+ prefix = "";
+ }
+ return prefix;
+ }
+
public ServerSession getSession(Connection conn, SessionAttach atc)
{
SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Fri May 1 10:24:39 2015
@@ -509,6 +509,8 @@ public class AMQProtocolEngine implement
QpidProperties.getBuildVersion());
serverProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME,
_broker.getName());
+
serverProperties.setString(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
+ getTemporaryQueuePrefix());
serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE,
String.valueOf(_closeWhenNoRoute));
serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
@@ -535,8 +537,26 @@ public class AMQProtocolEngine implement
}
}
-
-
+ private String getTemporaryQueuePrefix()
+ {
+ String prefix = "";
+ if
(_broker.getContextKeys(false).contains(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX))
+ {
+ prefix = _broker.getContextValue(String.class,
ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
+ }
+ if (prefix != null)
+ {
+ if (prefix.length() > 0 && !prefix.endsWith("/"))
+ {
+ prefix += "/";
+ }
+ }
+ else
+ {
+ prefix = "";
+ }
+ return prefix;
+ }
public synchronized void writeFrame(AMQDataBlock frame)
{
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Fri May 1 10:24:39 2015
@@ -1688,4 +1688,8 @@ public class AMQConnection extends Close
}
+ public String getTemporaryQueuePrefix()
+ {
+ return _delegate.getTemporaryQueuePrefix();
+ }
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
Fri May 1 10:24:39 2015
@@ -83,4 +83,6 @@ public interface AMQConnectionDelegate
boolean supportsIsBound();
boolean isMessageCompressionSupported();
+
+ String getTemporaryQueuePrefix();
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Fri May 1 10:24:39 2015
@@ -605,6 +605,14 @@ public class AMQConnectionDelegate_0_10
return _qpidConnection.isMessageCompressionSupported();
}
+ @Override
+ public String getTemporaryQueuePrefix()
+ {
+ final Map<String, Object> serverProperties =
_qpidConnection.getServerProperties();
+ String temporaryQueuePrefix = (String)
serverProperties.get(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
+ return (temporaryQueuePrefix == null ? "" : temporaryQueuePrefix);
+ }
+
private class RedirectConnectionException extends ConnectionException
{
private final String _host;
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Fri May 1 10:24:39 2015
@@ -24,10 +24,7 @@ import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -510,6 +507,14 @@ public class AMQConnectionDelegate_8_0 i
return _messageCompressionSupported;
}
+ @Override
+ public String getTemporaryQueuePrefix()
+ {
+ FieldTable serverProperties =
_conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+ String temporaryQueuePrefix =
serverProperties.getString(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX);
+ return (temporaryQueuePrefix == null ? "" : temporaryQueuePrefix);
+ }
+
public boolean isAddrSyntaxSupported()
{
return _addrSyntaxSupported;
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Fri May 1 10:24:39 2015
@@ -23,12 +23,7 @@ package org.apache.qpid.client;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
@@ -42,6 +37,7 @@ import java.util.concurrent.locks.Reentr
import javax.jms.*;
import javax.jms.IllegalStateException;
+import javax.jms.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -3240,6 +3236,13 @@ public abstract class AMQSession<C exten
return dispatcher == null ? null : dispatcher._lock;
}
+ public String createTemporaryQueueName()
+ {
+ String prefix = _connection.getTemporaryQueuePrefix();
+ assert(prefix.isEmpty() || prefix.endsWith("/"));
+ return prefix + "TempQueue" + UUID.randomUUID();
+ }
+
public interface Dispatchable
{
void dispatch(AMQSession ssn);
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Fri May 1 10:24:39 2015
@@ -766,7 +766,7 @@ public class AMQSession_0_10 extends AMQ
if (amqd.getAMQQueueName() == null)
{
// generate a name for this queue
- queueName = new AMQShortString("TempQueue" + UUID.randomUUID());
+ queueName = new AMQShortString(createTemporaryQueueName());
amqd.setQueueName(queueName);
}
else
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
Fri May 1 10:24:39 2015
@@ -36,7 +36,7 @@ final class AMQTemporaryQueue extends AM
/** Create a new instance of an AMQTemporaryQueue */
public AMQTemporaryQueue(AMQSession session)
{
- super(session.getTemporaryQueueExchangeName(), new
AMQShortString("TempQueue" + UUID.randomUUID()), true);
+ super(session.getTemporaryQueueExchangeName(), new
AMQShortString(session.createTemporaryQueueName()), true);
_session = session;
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java?rev=1677106&r1=1677105&r2=1677106&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
Fri May 1 10:24:39 2015
@@ -36,6 +36,11 @@ public final class ServerPropertyNames
public static final String FEDERATION_TAG = "qpid.federation_tag";
/**
+ * Server property: prefix used for federation of temporary queues
+ */
+ public static final String QPID_TEMPORARY_QUEUE_PREFIX =
"qpid.temporary_queue_prefix";
+
+ /**
* Server property: array of features supported by the server.
*/
public static final String QPID_FEATURES = "qpid.features";
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java?rev=1677106&view=auto
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
(added)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
Fri May 1 10:24:39 2015
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client;
+
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+
+public class TemporaryQueuePrefixTest extends QpidBrokerTestCase
+{
+ public void testNoPrefixSet() throws Exception
+ {
+ Connection connection = getConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+
+ assertTrue(queue.getQueueName() + " does not start with
\"TempQueue\".", queue.getQueueName().startsWith("TempQueue"));
+ connection.close();
+ }
+
+ public void testEmptyPrefix() throws Exception
+ {
+ String prefix = "";
+ setTestSystemProperty(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
prefix);
+ Connection connection = getConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+
+ assertTrue(queue.getQueueName() + " does not start with
\"TempQueue\".", queue.getQueueName().startsWith("TempQueue"));
+ connection.close();
+ }
+
+ public void testPrefixWithSlash() throws Exception
+ {
+ String prefix = "testPrefix/";
+ setTestSystemProperty(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
prefix);
+ Connection connection = getConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+
+ assertFalse(queue.getQueueName() + " has superfluous slash in
prefix.", queue.getQueueName().startsWith(prefix + "/"));
+ assertTrue(queue.getQueueName() + " does not start with expected
prefix \"" + prefix + "\".", queue.getQueueName().startsWith(prefix));
+ connection.close();
+ }
+
+ public void testPrefix() throws Exception
+ {
+ String prefix = "testPrefix";
+ setTestSystemProperty(ServerPropertyNames.QPID_TEMPORARY_QUEUE_PREFIX,
prefix);
+ Connection connection = getConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+
+ assertTrue(queue.getQueueName() + " does not start with expected
prefix \"" + prefix + "/\".", queue.getQueueName().startsWith(prefix + "/"));
+ connection.close();
+ }
+}
Propchange:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/TemporaryQueuePrefixTest.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]