Author: kwall
Date: Fri Jan 20 17:13:49 2017
New Revision: 1779653
URL: http://svn.apache.org/viewvc?rev=1779653&view=rev
Log:
QPID-7546: [System Tests] Enable QueueMessageDurabilityTest on AMQP 1-0
persistent profiles
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
(original)
+++
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
Fri Jan 20 17:13:49 2017
@@ -90,5 +90,7 @@ public interface JmsProvider
long getQueueDepth(Connection con, Queue destination) throws Exception;
+ boolean isQueueExist(Connection con, Queue destination) throws Exception;
+
String getBrokerDetailsFromDefaultConnectionUrl();
}
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
(original)
+++
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
Fri Jan 20 17:13:49 2017
@@ -383,6 +383,11 @@ public class QpidBrokerTestCase extends
return _jmsProvider.getQueueDepth(con, destination);
}
+ public boolean isQueueExist(final Connection con, final Queue destination)
throws Exception
+ {
+ return _jmsProvider.isQueueExist(con, destination);
+ }
+
/**
* Send messages to the given destination.
* <p/>
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
(original)
+++
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
Fri Jan 20 17:13:49 2017
@@ -220,7 +220,7 @@ public class QpidJmsClient0xProvider imp
@Override
public Queue getQueueFromName(Session session, String name) throws
JMSException
{
- return session.createQueue("ADDR: '" + name + "'");
+ return new AMQQueue("", name);
}
@Override
@@ -280,6 +280,21 @@ public class QpidJmsClient0xProvider imp
}
finally
{
+ session.close();
+ }
+ }
+
+ @Override
+ public boolean isQueueExist(final Connection con, final Queue destination)
throws Exception
+ {
+ Queue queue = new AMQQueue("", destination.getQueueName());
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ return ((AMQSession<?, ?>) session).isQueueBound((AMQDestination)
queue);
+ }
+ finally
+ {
session.close();
}
}
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
(original)
+++
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
Fri Jan 20 17:13:49 2017
@@ -381,6 +381,52 @@ public class QpidJmsClientProvider imple
}
@Override
+ public boolean isQueueExist(final Connection con, final Queue destination)
throws Exception
+ {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ MessageProducer producer =
session.createProducer(session.createQueue("$management"));
+ final TemporaryQueue responseQ = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(responseQ);
+ MapMessage message = session.createMapMessage();
+ message.setStringProperty("index", "object-path");
+ final String escapedName =
destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+ message.setStringProperty("key", escapedName);
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "READ");
+
+ message.setJMSReplyTo(responseQ);
+
+ producer.send(message);
+
+ Message response = consumer.receive();
+ try
+ {
+ int statusCode = response.getIntProperty("statusCode");
+ switch(statusCode)
+ {
+ case 200:
+ return true;
+ case 404:
+ return false;
+ default:
+ throw new RuntimeException(String.format("Unexpected
response for queue query '%s' : %d", destination.getQueueName(), statusCode));
+ }
+ }
+ finally
+ {
+ consumer.close();
+ responseQ.delete();
+ }
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ @Override
public Connection getConnectionWithSyncPublishing() throws Exception
{
return getConnection();
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
Fri Jan 20 17:13:49 2017
@@ -25,161 +25,143 @@ import java.util.Map;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.naming.NamingException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class QueueMessageDurabilityTest extends QpidBrokerTestCase
{
-
- private static final String QPID_MESSAGE_DURABILITY =
"qpid.message_durability";
private static final String DURABLE_ALWAYS_PERSIST_NAME =
"DURABLE_QUEUE_ALWAYS_PERSIST";
private static final String DURABLE_NEVER_PERSIST_NAME =
"DURABLE_QUEUE_NEVER_PERSIST";
private static final String DURABLE_DEFAULT_PERSIST_NAME =
"DURABLE_QUEUE_DEFAULT_PERSIST";
private static final String NONDURABLE_ALWAYS_PERSIST_NAME =
"NONDURABLE_QUEUE_ALWAYS_PERSIST";
+ private Queue _durableAlwaysPersist;
+ private Queue _durableNeverPersist;
+ private Queue _durableDefaultPersist;
+ private Queue _nonDurableAlwaysPersist;
+ private String _topicNameFormat;
@Override
public void setUp() throws Exception
{
super.setUp();
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQSession amqSession = (AMQSession) session;
Map<String,Object> arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY,
MessageDurability.ALWAYS.name());
- amqSession.createQueue(DURABLE_ALWAYS_PERSIST_NAME, false, true,
false, arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,
MessageDurability.ALWAYS.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+ _durableAlwaysPersist = createQueueWithArguments(session,
DURABLE_ALWAYS_PERSIST_NAME, arguments);
arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
- amqSession.createQueue(DURABLE_NEVER_PERSIST_NAME, false, true, false,
arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,
MessageDurability.NEVER.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+ _durableNeverPersist = createQueueWithArguments(session,
DURABLE_NEVER_PERSIST_NAME, arguments);
arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY,
MessageDurability.DEFAULT.name());
- amqSession.createQueue(DURABLE_DEFAULT_PERSIST_NAME, false, true,
false, arguments);
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,
MessageDurability.DEFAULT.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+ _durableDefaultPersist = createQueueWithArguments(session,
DURABLE_DEFAULT_PERSIST_NAME, arguments);
arguments = new HashMap<>();
- arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
- amqSession.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME, false, false,
false, arguments);
+
arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+ arguments.put(org.apache.qpid.server.model.Queue.DURABLE, false);
+ _nonDurableAlwaysPersist = createQueueWithArguments(session,
NONDURABLE_ALWAYS_PERSIST_NAME, arguments);
+
+ bindQueue(session, "amq.topic", DURABLE_ALWAYS_PERSIST_NAME,
"Y.*.*.*");
+ bindQueue(session, "amq.topic", DURABLE_NEVER_PERSIST_NAME, "*.Y.*.*");
+ bindQueue(session, "amq.topic", DURABLE_DEFAULT_PERSIST_NAME,
"*.*.Y.*");
+ bindQueue(session, "amq.topic", NONDURABLE_ALWAYS_PERSIST_NAME,
"*.*.*.Y");
+
+ _topicNameFormat = isBroker10() ? "amq.topic/%s" : "%s";
- amqSession.bindQueue(DURABLE_ALWAYS_PERSIST_NAME,
- "Y.*.*.*",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
-
- amqSession.bindQueue(DURABLE_NEVER_PERSIST_NAME,
- "*.Y.*.*",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
-
- amqSession.bindQueue(DURABLE_DEFAULT_PERSIST_NAME,
- "*.*.Y.*",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
-
- amqSession.bindQueue(NONDURABLE_ALWAYS_PERSIST_NAME,
- "*.*.*.Y",
- null,
- ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- null);
+ conn.close();
}
public void testSendPersistentMessageToAll() throws Exception
{
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
conn.start();
- producer.send(session.createTopic("Y.Y.Y.Y"),
session.createTextMessage("test"));
+ producer.send(session.createTopic(String.format(_topicNameFormat,
"Y.Y.Y.Y")), session.createTextMessage("test"));
session.commit();
- AMQSession amqSession = (AMQSession) session;
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+ assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
restartDefaultBroker();
- conn = getConnection();
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- amqSession = (AMQSession) session;
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-
- assertFalse(amqSession.isQueueBound((AMQDestination)
session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ conn = createStartedConnection();
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+ assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
}
-
public void testSendNonPersistentMessageToAll() throws Exception
{
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
conn.start();
- producer.send(session.createTopic("Y.Y.Y.Y"),
session.createTextMessage("test"));
+ producer.send(session.createTopic(String.format(_topicNameFormat,
"Y.Y.Y.Y")), session.createTextMessage("test"));
session.commit();
- AMQSession amqSession = (AMQSession) session;
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+ assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
restartDefaultBroker();
- conn = getConnection();
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- amqSession = (AMQSession) session;
-
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ conn = createStartedConnection();
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
-
assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+ assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
}
public void testNonPersistentContentRetained() throws Exception
{
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
conn.start();
- producer.send(session.createTopic("N.N.Y.Y"),
session.createTextMessage("test1"));
- producer.send(session.createTopic("Y.N.Y.Y"),
session.createTextMessage("test2"));
+ producer.send(session.createTopic(String.format(_topicNameFormat,
"N.N.Y.Y")), session.createTextMessage("test1"));
+ producer.send(session.createTopic(String.format(_topicNameFormat,
"Y.N.Y.Y")), session.createTextMessage("test2"));
session.commit();
- MessageConsumer consumer =
session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
- Message msg = consumer.receive(1000l);
+ MessageConsumer consumer =
session.createConsumer(_durableAlwaysPersist);
+ Message msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test2", ((TextMessage) msg).getText());
session.rollback();
restartDefaultBroker();
- conn = getConnection();
- conn.start();
+ conn = createStartedConnection();
session = conn.createSession(true, Session.SESSION_TRANSACTED);
- AMQSession amqSession = (AMQSession) session;
- assertEquals(1, amqSession.getQueueDepth((AMQDestination)
session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
- consumer =
session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
- msg = consumer.receive(1000l);
+ assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+ assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+ assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
+
+ consumer = session.createConsumer(_durableAlwaysPersist);
+ msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test2", ((TextMessage)msg).getText());
@@ -189,27 +171,51 @@ public class QueueMessageDurabilityTest
public void testPersistentContentRetainedOnTransientQueue() throws
Exception
{
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME,
"false");
- Connection conn = getConnection();
+ Connection conn = createStartedConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
conn.start();
- producer.send(session.createTopic("N.N.Y.Y"),
session.createTextMessage("test1"));
+ producer.send(session.createTopic(String.format(_topicNameFormat,
"N.N.Y.Y")), session.createTextMessage("test1"));
session.commit();
- MessageConsumer consumer =
session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
- Message msg = consumer.receive(1000l);
+ MessageConsumer consumer =
session.createConsumer(_durableDefaultPersist);
+ Message msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test1", ((TextMessage)msg).getText());
session.commit();
System.gc();
- consumer =
session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
- msg = consumer.receive(1000l);
+ consumer = session.createConsumer(_nonDurableAlwaysPersist);
+ msg = consumer.receive(getReceiveTimeout());
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("test1", ((TextMessage)msg).getText());
session.commit();
}
+ private Connection createStartedConnection() throws JMSException,
NamingException
+ {
+ Connection conn = getConnection();
+ conn.start();
+ return conn;
+ }
+
+ private Queue createQueueWithArguments(final Session session,
+ final String testQueueName,
+ final Map<String, Object>
arguments) throws Exception
+ {
+ createEntityUsingAmqpManagement(testQueueName, session,
"org.apache.qpid.Queue", arguments);
+ return getQueueFromName(session, testQueueName);
+ }
+
+ private void bindQueue(final Session session, final String exchange, final
String queueName,
+ final String bindingKey) throws Exception
+ {
+
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put("destination", queueName);
+ arguments.put("bindingKey", bindingKey);
+ performOperationUsingAmqpManagement(exchange, "bind", session,
"org.apache.qpid.TopicExchange", arguments);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]