Author: kwall
Date: Mon Sep 28 17:58:02 2015
New Revision: 1705736
URL: http://svn.apache.org/viewvc?rev=1705736&view=rev
Log:
QPID-6744/QPID-6759: [Java Client/Java Broker] 0-8..0-91 Correct handling of
queue.delete and exchange.delete.
* Make client's use of the queue.delete nowait flag in line with the
expectation for a response.
* Correct client's response class expectation when performing an exchange
delete (previously awaited a ExchangeDeclareOkBody!)
* Make broker send a queue.delete/exchange.delete response only when nowait =
false
* For backward compatibility, when the peer is identified as an older Qpid
client, send an queue delete response regardless. Identification is performed
by checking the product name and product version, the latter can be overriden,
which may be useful for private implementations.
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/ExchangeDeleteTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeleteTest.java
- copied, changed from r1705735,
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/Session.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
Mon Sep 28 17:58:02 2015
@@ -21,8 +21,6 @@
package org.apache.qpid.server.model;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -139,6 +137,10 @@ public interface Broker<X extends Broker
@ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE)
int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+ String CONNECTION_QUEUE_DELETE_NOWAIT_VERSION_REGEXP =
"connection.queueDeleteNoWaitVersionRegexp";
+ @ManagedContextDefault(name =
CONNECTION_QUEUE_DELETE_NOWAIT_VERSION_REGEXP)
+ String DEFAULT_CONNECTION_QUEUE_DELETE_NOWAIT_VERSION_REGEXP = "^0\\..*$";
+
String BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE =
"broker.directByteBufferPoolSize";
@ManagedContextDefault(name = BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE)
int DEFAULT_BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE = 1024;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Mon Sep 28 17:58:02 2015
@@ -3106,7 +3106,10 @@ public class AMQChannel
ExchangeDeleteOkBody responseBody =
_connection.getMethodRegistry().createExchangeDeleteOkBody();
-
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ if (!nowait)
+ {
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
}
catch (ExchangeIsAlternateException e)
{
@@ -3485,9 +3488,12 @@ public class AMQChannel
{
int purged = virtualHost.removeQueue(queue);
- MethodRegistry methodRegistry =
_connection.getMethodRegistry();
- QueueDeleteOkBody responseBody =
methodRegistry.createQueueDeleteOkBody(purged);
-
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ if (!nowait ||
_connection.isSendQueueDeleteOkRegardless())
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ QueueDeleteOkBody responseBody =
methodRegistry.createQueueDeleteOkBody(purged);
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
}
catch (AccessControlException e)
{
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Mon Sep 28 17:58:02 2015
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -161,6 +162,15 @@ public class AMQPConnection_0_8
private volatile boolean _closeWhenNoRoute;
private boolean _compressionSupported;
private int _messageCompressionThreshold;
+
+ /**
+ * QPID-6744 - Older queue clients (<=0.32) incorrectly set the nowait
flag false on the queue.delete method
+ * and then await regardless. If we detect an old Qpid client, we send
the queue.delete-ok response
+ * regardless of the queue.delete flag request made by the client.
+ */
+ private volatile boolean _sendQueueDeleteOkRegardless = true;
+ private final Pattern _connectionQueueDeleteNoWaitVerRegexp;
+
private int _currentClassId;
private int _currentMethodId;
private int _binaryDataLimit;
@@ -183,6 +193,9 @@ public class AMQPConnection_0_8
_binaryDataLimit =
getBroker().getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
? getBroker().getContextValue(Integer.class,
BROKER_DEBUG_BINARY_DATA_LENGTH)
: DEFAULT_DEBUG_BINARY_DATA_LENGTH;
+ String queueDeleteNoWaitRegexp =
getBroker().getContextKeys(false).contains(Broker.CONNECTION_QUEUE_DELETE_NOWAIT_VERSION_REGEXP)
+ ? getBroker().getContextValue(String.class,
Broker.CONNECTION_QUEUE_DELETE_NOWAIT_VERSION_REGEXP): "";
+ _connectionQueueDeleteNoWaitVerRegexp =
Pattern.compile(queueDeleteNoWaitRegexp);
int maxMessageSize = port.getContextValue(Integer.class,
AmqpPort.PORT_MAX_MESSAGE_SIZE);
_maxMessageSize = (maxMessageSize > 0) ? (long) maxMessageSize :
Long.MAX_VALUE;
@@ -680,7 +693,7 @@ public class AMQPConnection_0_8
@Override
public String toString()
{
- return _network.getRemoteAddress() + "(" + (getAuthorizedPrincipal()
== null ? "?" : getAuthorizedPrincipal().getName() + ")");
+ return _network.getRemoteAddress() + "(" + ((getAuthorizedPrincipal()
== null ? "?" : getAuthorizedPrincipal().getName()) + ")");
}
private String getLocalFQDN()
@@ -706,6 +719,16 @@ public class AMQPConnection_0_8
_saslServer = saslServer;
}
+ public boolean isSendQueueDeleteOkRegardless()
+ {
+ return _sendQueueDeleteOkRegardless;
+ }
+
+ void setSendQueueDeleteOkRegardless(boolean sendQueueDeleteOkRegardless)
+ {
+ _sendQueueDeleteOkRegardless = sendQueueDeleteOkRegardless;
+ }
+
private void setClientProperties(FieldTable clientProperties)
{
if (clientProperties != null)
@@ -714,26 +737,35 @@ public class AMQPConnection_0_8
if (closeWhenNoRoute != null)
{
_closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Client set closeWhenNoRoute=" +
_closeWhenNoRoute + " for protocol engine " + this);
- }
+ _logger.debug("Client set closeWhenNoRoute={} for connection
{}", _closeWhenNoRoute, this);
}
String compressionSupported =
clientProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
if (compressionSupported != null)
{
_compressionSupported =
Boolean.parseBoolean(compressionSupported);
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Client set compressionSupported=" +
_compressionSupported + " for protocol engine " + this);
- }
+ _logger.debug("Client set compressionSupported={} for
connection {}", _compressionSupported, this);
}
String clientId =
clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
+ String clientVersion =
clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+ String clientProduct =
clientProperties.getString(ConnectionStartProperties.PRODUCT);
+ String remoteProcessPid =
clientProperties.getString(ConnectionStartProperties.PID);
+
+ boolean mightBeQpidClient = clientProduct == null ||
+
clientProduct.toLowerCase().contains("qpid") ||
+
clientProduct.toLowerCase().equals("unknown");
+ boolean sendQueueDeleteOkRegardless = mightBeQpidClient &&
(clientVersion == null ||
_connectionQueueDeleteNoWaitVerRegexp.matcher(clientVersion).matches());
+
+ setSendQueueDeleteOkRegardless(sendQueueDeleteOkRegardless);
+ if (sendQueueDeleteOkRegardless)
+ {
+ _logger.debug("Peer is an older Qpid client, queue delete-ok
response will be sent"
+ + " regardless for connection {}", this);
+ }
-
setClientVersion(clientProperties.getString(ConnectionStartProperties.VERSION_0_8));
-
setClientProduct(clientProperties.getString(ConnectionStartProperties.PRODUCT));
-
setRemoteProcessPid(clientProperties.getString(ConnectionStartProperties.PID));
+ setClientVersion(clientVersion);
+ setClientProduct(clientProduct);
+ setRemoteProcessPid(remoteProcessPid);
setClientId(clientId == null ? UUID.randomUUID().toString() :
clientId);
getEventLogger().message(ConnectionMessages.OPEN(getClientId(),
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Sep 28 17:58:02 2015
@@ -1588,6 +1588,28 @@ public abstract class AMQSession<C exten
declareExchange(name, type, nowait, false, false, false);
}
+ @Override
+ public void deleteExchange(final String exchangeName) throws JMSException
+ {
+ try
+ {
+ new FailoverRetrySupport<>(new FailoverProtectedOperation<Object,
QpidException>()
+ {
+ public Object execute() throws QpidException, FailoverException
+ {
+ sendExchangeDelete(exchangeName, false);
+ return null;
+ }
+ }, _connection).execute();
+ }
+ catch (QpidException e)
+ {
+ throw toJMSException("The exchange deletion failed: " +
e.getMessage(), e);
+ }
+ }
+
+ abstract void sendExchangeDelete(final String name, final boolean nowait)
throws QpidException, FailoverException;
+
abstract public void sync() throws QpidException;
public int getAcknowledgeMode()
@@ -2918,9 +2940,9 @@ public abstract class AMQSession<C exten
* @param queueName The name of the queue to delete.
*
* @throws JMSException If the queue could not be deleted for any reason.
- * TODO Be aware of possible changes to parameter order as versions
change.
*/
- protected void deleteQueue(final String queueName) throws JMSException
+ @Override
+ public void deleteQueue(final String queueName) throws JMSException
{
try
{
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Sep 28 17:58:02 2015
@@ -801,8 +801,6 @@ public class AMQSession_0_10 extends AMQ
public void sendQueueDelete(final String queueName) throws QpidException,
FailoverException
{
getQpidSession().queueDelete(queueName);
- // ifEmpty --> false
- // ifUnused --> false
// We need to sync so that we get notify of an error.
sync();
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Mon Sep 28 17:58:02 2015
@@ -636,13 +636,16 @@ public class AMQSession_0_8 extends AMQS
getProtocolHandler().syncWrite(exchangeDeclare,
ExchangeDeclareOkBody.class);
}
- public void sendExchangeDelete(final String name) throws QpidException,
FailoverException
+ @Override
+ public void sendExchangeDelete(final String name, boolean nowait) throws
QpidException, FailoverException
{
+ //The 'nowait' parameter is only used on the 0-10 path, it is ignored
on the 0-8/0-9/0-9-1 path
+
ExchangeDeleteBody body =
getMethodRegistry().createExchangeDeleteBody(getTicket(),
name, false, false);
- AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+ AMQFrame exchangeDelete = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(exchangeDeclare,
ExchangeDeclareOkBody.class);
+ getProtocolHandler().syncWrite(exchangeDelete,
ExchangeDeleteOkBody.class);
}
private void sendQueueDeclare(final AMQDestination amqd, boolean passive)
throws QpidException, FailoverException
@@ -679,7 +682,7 @@ public class AMQSession_0_8 extends AMQS
protected String declareQueue(final AMQDestination amqd, final boolean
noLocal,
final boolean nowait, final boolean passive)
throws QpidException
{
- //The 'noWait' parameter is only used on the 0-10 path, it is ignored
on the 0-8/0-9/0-9-1 path
+ //The 'nowait' parameter is only used on the 0-10 path, it is ignored
on the 0-8/0-9/0-9-1 path
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new FailoverNoopSupport<String, QpidException>(
@@ -706,7 +709,7 @@ public class AMQSession_0_8 extends AMQS
queueName,
false,
false,
- true);
+
false);
AMQFrame queueDeleteFrame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeleteFrame,
QueueDeleteOkBody.class);
@@ -1280,12 +1283,11 @@ public class AMQSession_0_8 extends AMQS
{
if (isExchangeExist(dest,false))
{
-
new FailoverNoopSupport<Object, QpidException>(new
FailoverProtectedOperation<Object, QpidException>()
{
public Object execute() throws QpidException,
FailoverException
{
- sendExchangeDelete(dest.getAddressName());
+ sendExchangeDelete(dest.getAddressName(), false);
return null;
}
}, getAMQConnection()).execute();
@@ -1296,7 +1298,6 @@ public class AMQSession_0_8 extends AMQS
{
if (isQueueExist(dest,false))
{
-
new FailoverNoopSupport<Object, QpidException>(new
FailoverProtectedOperation<Object, QpidException>()
{
public Object execute() throws QpidException,
FailoverException
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/Session.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/Session.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/Session.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/Session.java Mon
Sep 28 17:58:02 2015
@@ -83,7 +83,7 @@ public interface Session extends TopicSe
/**
* Create a producer
- * @param destination
+ * @param destination
* @param immediate the value of the immediate flag used by default on the
producer
* @return
* @throws JMSException
@@ -100,4 +100,24 @@ public interface Session extends TopicSe
String getTemporaryQueueExchangeName();
ListMessage createListMessage() throws JMSException;
+
+ /**
+ * Deletes the queue identified by the given name. If the queue does not
exist
+ * a JMSException with error code 404 will be thrown and this Session will
be closed.
+ *
+ * @param queueName name of the queue
+ *
+ * @throws JMSException
+ */
+ void deleteQueue(String queueName) throws JMSException;
+
+ /**
+ * Deletes the exchange identified by the given name. If the exchange does
not exist
+ * a JMSException with error code 404 will be thrown and this Session will
be closed.
+ *
+ * @param exchangeName name of the exchange
+ *
+ * @throws JMSException
+ */
+ void deleteExchange(String exchangeName) throws JMSException;
}
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/ExchangeDeleteTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/ExchangeDeleteTest.java?rev=1705736&view=auto
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/ExchangeDeleteTest.java
(added)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/ExchangeDeleteTest.java
Mon Sep 28 17:58:02 2015
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.session;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ExchangeDeleteTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private AMQSession<?, ?> _session;
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Turn off queue declare side effect of creating consumer
+
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME,
"false");
+
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME,
"false");
+
setTestClientSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME,
"false");
+
+ _connection = getConnection();
+ _connection.start();
+ _session = (AMQSession<?, ?>) _connection.createSession(true,
Session.SESSION_TRANSACTED);
+ }
+
+ public void testDeleteExchange() throws Exception
+ {
+ String exchangeName = getTestName();
+
+ _session.declareExchange(exchangeName,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
+
+ _session.deleteExchange(exchangeName);
+ }
+
+ public void testDeleteNonExistentExchange() throws Exception
+ {
+ String exchangeName = getTestName();
+
+ try
+ {
+ _session.deleteExchange(exchangeName);
+ fail("Exception not thrown");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ assertEquals("Expecting exchange not found",
+ String.valueOf(AMQConstant.NOT_FOUND.getCode()),
+ e.getErrorCode());
+ }
+
+ assertTrue("Session expected to be closed", _session.isClosed());
+
+ }
+
+}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
Mon Sep 28 17:58:02 2015
@@ -64,7 +64,7 @@ public class QueueDeclareTest extends Qp
AMQDestination durable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), true));
AMQDestination nondurable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), false));
- verifyDurabiltyIgnoreIfQueueExists(durable, nondurable);
+ verifyDurabilityIgnoreIfQueueExists(durable, nondurable);
}
public void testDeclareIgnoresDurableFlagIfNonDurableQueueAlreadyExists()
throws Exception
@@ -73,10 +73,11 @@ public class QueueDeclareTest extends Qp
AMQDestination nondurable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), false));
AMQDestination durable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), true));
- verifyDurabiltyIgnoreIfQueueExists(nondurable, durable);
+ verifyDurabilityIgnoreIfQueueExists(nondurable, durable);
}
- private void verifyDurabiltyIgnoreIfQueueExists(final AMQDestination
firstDeclare, final AMQDestination secondDeclare) throws Exception
+ private void verifyDurabilityIgnoreIfQueueExists(final AMQDestination
firstDeclare,
+ final AMQDestination
secondDeclare) throws Exception
{
_session.declareAndBind(firstDeclare);
Copied:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeleteTest.java
(from r1705735,
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java)
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeleteTest.java?p2=qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeleteTest.java&p1=qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java&r1=1705735&r2=1705736&rev=1705736&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeleteTest.java
Mon Sep 28 17:58:02 2015
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,15 +21,18 @@ package org.apache.qpid.client.session;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-public class QueueDeclareTest extends QpidBrokerTestCase
+public class QueueDeleteTest extends QpidBrokerTestCase
{
private Connection _connection;
private AMQSession<?, ?> _session;
@@ -38,54 +40,66 @@ public class QueueDeclareTest extends Qp
{
super.setUp();
+ // Turn off queue declare side effect of creating consumer
+
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME,
"false");
+
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME,
"false");
+
setTestClientSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME,
"false");
+
_connection = getConnection();
_connection.start();
_session = (AMQSession<?, ?>) _connection.createSession(true,
Session.SESSION_TRANSACTED);
}
- public void testDeclareAndBindWhenQueueIsNotSpecifiedInDestinationUrl()
throws Exception
+ public void testDeleteQueue() throws Exception
{
- AMQDestination destination = (AMQDestination)
_session.createQueue("topic://amq.topic//?routingkey='testTopic'");
-
- assertEquals("Non empty queue name unexpectedly generated by parser :
" + destination.getAMQQueueName(), "", destination.getAMQQueueName());
+ AMQDestination destination = (AMQDestination)
_session.createQueue(String.format("direct://amq.direct//%s",
getTestQueueName()));
_session.declareAndBind(destination);
- assertFalse("Non empty queue name should have been generated by
declareAndBind",
- "".equals(destination.getAMQQueueName()));
+ sendMessage(_session, destination, 2);
- sendMessage(_session, destination, 1);
receiveMessage(destination);
- }
- public void testDeclareIgnoresNonDurableFlagIfDurableQueueAlreadyExists()
throws Exception
- {
- String format = "direct://amq.direct//%s?durable='%s'";
- AMQDestination durable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), true));
- AMQDestination nondurable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), false));
+ _session.deleteQueue(destination.getQueueName());
- verifyDurabiltyIgnoreIfQueueExists(durable, nondurable);
- }
-
- public void testDeclareIgnoresDurableFlagIfNonDurableQueueAlreadyExists()
throws Exception
- {
- String format = "direct://amq.direct//%s?durable='%s'";
- AMQDestination nondurable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), false));
- AMQDestination durable = (AMQDestination)
_session.createQueue(String.format(format, getTestQueueName(), true));
+ // Trying to consume from a queue that does not exist will cause an
exception
+ try
+ {
+ _session.createConsumer(destination);
+ fail("Exception not thrown");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ assertEquals("Expecting queue not found",
+ String.valueOf(AMQConstant.NOT_FOUND.getCode()),
+ e.getErrorCode());
+ }
- verifyDurabiltyIgnoreIfQueueExists(nondurable, durable);
+ assertTrue("Session expected to be closed", _session.isClosed());
}
- private void verifyDurabiltyIgnoreIfQueueExists(final AMQDestination
firstDeclare, final AMQDestination secondDeclare) throws Exception
+ public void testDeleteNonExistentQueue() throws Exception
{
- _session.declareAndBind(firstDeclare);
+ AMQDestination destination = (AMQDestination)
_session.createQueue(String.format("direct://amq.direct//%s",
getTestQueueName()));
- sendMessage(_session, firstDeclare, 1);
+ try
+ {
+ _session.deleteQueue(destination.getQueueName());
+ fail("Exception not thrown");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ assertEquals("Expecting queue not found",
+ String.valueOf(AMQConstant.NOT_FOUND.getCode()),
+ e.getErrorCode());
+ }
- _session.declareAndBind(secondDeclare);
- receiveMessage(secondDeclare);
+ assertTrue("Session expected to be closed", _session.isClosed());
}
+
private void receiveMessage(final Destination destination) throws Exception
{
MessageConsumer consumer = _session.createConsumer(destination);
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1705736&r1=1705735&r2=1705736&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
Mon Sep 28 17:58:02 2015
@@ -30,14 +30,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
/**
* Exchange
@@ -183,23 +176,9 @@ public class ExchangeLoggingTest extends
_monitor.markDiscardPoint();
//create the exchange by creating a consumer
- _session.createConsumer(_queue);
-
- //now delete the exchange
- if(isBroker010())
- {
- ((AMQSession_0_10) _session).sendExchangeDelete(_name, false);
- }
- else
- {
- MethodRegistry registry = new MethodRegistry(ProtocolVersion.v0_8);
-
- ExchangeDeleteBody body = registry.createExchangeDeleteBody(0,
_name, false, true);
+ _session.createConsumer(_queue).close();
- AMQFrame exchangeDeclare =
body.generateFrame(((AMQSession)_session).getChannelId());
-
- ((AMQConnection)
_connection).getProtocolHandler().syncWrite(exchangeDeclare,
ExchangeDeleteOkBody.class);
- }
+ ((AMQSession) _session).deleteExchange(_name);
//Wait and ensure we get our last EXH-1002 msg
waitForMessage("EXH-1002");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]