Author: kwall
Date: Fri Jan 20 17:13:49 2017
New Revision: 1779653

URL: http://svn.apache.org/viewvc?rev=1779653&view=rev
Log:
QPID-7546: [System Tests] Enable QueueMessageDurabilityTest on AMQP 1-0 
persistent profiles

Modified:
    
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
    
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
    
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
    
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java

Modified: 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
 (original)
+++ 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
 Fri Jan 20 17:13:49 2017
@@ -90,5 +90,7 @@ public interface JmsProvider
 
     long getQueueDepth(Connection con, Queue destination) throws Exception;
 
+    boolean isQueueExist(Connection con, Queue destination) throws Exception;
+
     String getBrokerDetailsFromDefaultConnectionUrl();
 }

Modified: 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
 (original)
+++ 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
 Fri Jan 20 17:13:49 2017
@@ -383,6 +383,11 @@ public class QpidBrokerTestCase extends
         return _jmsProvider.getQueueDepth(con, destination);
     }
 
+    public boolean isQueueExist(final Connection con, final Queue destination) 
throws Exception
+    {
+        return _jmsProvider.isQueueExist(con, destination);
+    }
+
     /**
      * Send messages to the given destination.
      * <p/>

Modified: 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
 (original)
+++ 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
 Fri Jan 20 17:13:49 2017
@@ -220,7 +220,7 @@ public class QpidJmsClient0xProvider imp
     @Override
     public Queue getQueueFromName(Session session, String name) throws 
JMSException
     {
-        return session.createQueue("ADDR: '" + name + "'");
+        return new AMQQueue("", name);
     }
 
     @Override
@@ -280,6 +280,21 @@ public class QpidJmsClient0xProvider imp
         }
         finally
         {
+            session.close();
+        }
+    }
+
+    @Override
+    public boolean isQueueExist(final Connection con, final Queue destination) 
throws Exception
+    {
+        Queue queue = new AMQQueue("", destination.getQueueName());
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            return ((AMQSession<?, ?>) session).isQueueBound((AMQDestination) 
queue);
+        }
+        finally
+        {
             session.close();
         }
     }

Modified: 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
 (original)
+++ 
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
 Fri Jan 20 17:13:49 2017
@@ -381,6 +381,52 @@ public class QpidJmsClientProvider imple
     }
 
     @Override
+    public boolean isQueueExist(final Connection con, final Queue destination) 
throws Exception
+    {
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            MessageProducer producer = 
session.createProducer(session.createQueue("$management"));
+            final TemporaryQueue responseQ = session.createTemporaryQueue();
+            MessageConsumer consumer = session.createConsumer(responseQ);
+            MapMessage message = session.createMapMessage();
+            message.setStringProperty("index", "object-path");
+            final String escapedName = 
destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+            message.setStringProperty("key", escapedName);
+            message.setStringProperty("type", "org.apache.qpid.Queue");
+            message.setStringProperty("operation", "READ");
+
+            message.setJMSReplyTo(responseQ);
+
+            producer.send(message);
+
+            Message response = consumer.receive();
+            try
+            {
+                int statusCode = response.getIntProperty("statusCode");
+                switch(statusCode)
+                {
+                    case 200:
+                        return true;
+                    case 404:
+                        return false;
+                    default:
+                        throw new RuntimeException(String.format("Unexpected 
response for queue query '%s' :  %d", destination.getQueueName(), statusCode));
+                }
+            }
+            finally
+            {
+                consumer.close();
+                responseQ.delete();
+            }
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+    @Override
     public Connection getConnectionWithSyncPublishing() throws Exception
     {
         return getConnection();

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java?rev=1779653&r1=1779652&r2=1779653&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
 Fri Jan 20 17:13:49 2017
@@ -25,161 +25,143 @@ import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.naming.NamingException;
 
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class QueueMessageDurabilityTest extends QpidBrokerTestCase
 {
-
-    private static final String QPID_MESSAGE_DURABILITY = 
"qpid.message_durability";
     private static final String DURABLE_ALWAYS_PERSIST_NAME = 
"DURABLE_QUEUE_ALWAYS_PERSIST";
     private static final String DURABLE_NEVER_PERSIST_NAME = 
"DURABLE_QUEUE_NEVER_PERSIST";
     private static final String DURABLE_DEFAULT_PERSIST_NAME = 
"DURABLE_QUEUE_DEFAULT_PERSIST";
     private static final String NONDURABLE_ALWAYS_PERSIST_NAME = 
"NONDURABLE_QUEUE_ALWAYS_PERSIST";
+    private Queue _durableAlwaysPersist;
+    private Queue _durableNeverPersist;
+    private Queue _durableDefaultPersist;
+    private Queue _nonDurableAlwaysPersist;
+    private String _topicNameFormat;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        AMQSession amqSession = (AMQSession) session;
 
         Map<String,Object> arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY, 
MessageDurability.ALWAYS.name());
-        amqSession.createQueue(DURABLE_ALWAYS_PERSIST_NAME, false, true, 
false, arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, 
MessageDurability.ALWAYS.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+        _durableAlwaysPersist = createQueueWithArguments(session, 
DURABLE_ALWAYS_PERSIST_NAME, arguments);
 
         arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
-        amqSession.createQueue(DURABLE_NEVER_PERSIST_NAME, false, true, false, 
arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, 
MessageDurability.NEVER.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+        _durableNeverPersist = createQueueWithArguments(session, 
DURABLE_NEVER_PERSIST_NAME, arguments);
 
         arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY, 
MessageDurability.DEFAULT.name());
-        amqSession.createQueue(DURABLE_DEFAULT_PERSIST_NAME, false, true, 
false, arguments);
+        arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY, 
MessageDurability.DEFAULT.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, true);
+        _durableDefaultPersist = createQueueWithArguments(session, 
DURABLE_DEFAULT_PERSIST_NAME, arguments);
 
         arguments = new HashMap<>();
-        arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
-        amqSession.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME, false, false, 
false, arguments);
+        
arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+        arguments.put(org.apache.qpid.server.model.Queue.DURABLE, false);
+        _nonDurableAlwaysPersist = createQueueWithArguments(session, 
NONDURABLE_ALWAYS_PERSIST_NAME, arguments);
+
+        bindQueue(session, "amq.topic", DURABLE_ALWAYS_PERSIST_NAME, 
"Y.*.*.*");
+        bindQueue(session, "amq.topic", DURABLE_NEVER_PERSIST_NAME, "*.Y.*.*");
+        bindQueue(session, "amq.topic", DURABLE_DEFAULT_PERSIST_NAME, 
"*.*.Y.*");
+        bindQueue(session, "amq.topic", NONDURABLE_ALWAYS_PERSIST_NAME, 
"*.*.*.Y");
+
+        _topicNameFormat = isBroker10() ? "amq.topic/%s" : "%s";
 
-        amqSession.bindQueue(DURABLE_ALWAYS_PERSIST_NAME,
-                             "Y.*.*.*",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
-
-        amqSession.bindQueue(DURABLE_NEVER_PERSIST_NAME,
-                             "*.Y.*.*",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
-
-        amqSession.bindQueue(DURABLE_DEFAULT_PERSIST_NAME,
-                             "*.*.Y.*",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
-
-        amqSession.bindQueue(NONDURABLE_ALWAYS_PERSIST_NAME,
-                             "*.*.*.Y",
-                             null,
-                             ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                             null);
+        conn.close();
     }
 
     public void testSendPersistentMessageToAll() throws Exception
     {
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         conn.start();
-        producer.send(session.createTopic("Y.Y.Y.Y"), 
session.createTextMessage("test"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, 
"Y.Y.Y.Y")), session.createTextMessage("test"));
         session.commit();
 
-        AMQSession amqSession = (AMQSession) session;
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+        assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
 
         restartDefaultBroker();
 
-        conn = getConnection();
-        session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        amqSession = (AMQSession) session;
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-
-        assertFalse(amqSession.isQueueBound((AMQDestination) 
session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        conn = createStartedConnection();
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
 
+        assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
     }
 
-
     public void testSendNonPersistentMessageToAll() throws Exception
     {
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         conn.start();
-        producer.send(session.createTopic("Y.Y.Y.Y"), 
session.createTextMessage("test"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, 
"Y.Y.Y.Y")), session.createTextMessage("test"));
         session.commit();
 
-        AMQSession amqSession = (AMQSession) session;
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(1, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(1, getQueueDepth(conn, _durableDefaultPersist));
+        assertEquals(1, getQueueDepth(conn,_nonDurableAlwaysPersist));
 
         restartDefaultBroker();
 
-        conn = getConnection();
-        session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        amqSession = (AMQSession) session;
-        
assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+        conn = createStartedConnection();
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
 
-        
assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+        assertFalse(isQueueExist(conn, _nonDurableAlwaysPersist));
 
     }
 
     public void testNonPersistentContentRetained() throws Exception
     {
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         conn.start();
-        producer.send(session.createTopic("N.N.Y.Y"), 
session.createTextMessage("test1"));
-        producer.send(session.createTopic("Y.N.Y.Y"), 
session.createTextMessage("test2"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, 
"N.N.Y.Y")), session.createTextMessage("test1"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, 
"Y.N.Y.Y")), session.createTextMessage("test2"));
         session.commit();
-        MessageConsumer consumer = 
session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
-        Message msg = consumer.receive(1000l);
+        MessageConsumer consumer = 
session.createConsumer(_durableAlwaysPersist);
+        Message msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test2", ((TextMessage) msg).getText());
         session.rollback();
         restartDefaultBroker();
-        conn = getConnection();
-        conn.start();
+        conn = createStartedConnection();
         session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        AMQSession amqSession = (AMQSession) session;
-        assertEquals(1, amqSession.getQueueDepth((AMQDestination) 
session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
-        
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
-        
assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
-        consumer = 
session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
-        msg = consumer.receive(1000l);
+        assertEquals(1, getQueueDepth(conn, _durableAlwaysPersist));
+        assertEquals(0, getQueueDepth(conn, _durableNeverPersist));
+        assertEquals(0, getQueueDepth(conn, _durableDefaultPersist));
+
+        consumer = session.createConsumer(_durableAlwaysPersist);
+        msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test2", ((TextMessage)msg).getText());
@@ -189,27 +171,51 @@ public class QueueMessageDurabilityTest
     public void testPersistentContentRetainedOnTransientQueue() throws 
Exception
     {
         
setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, 
"false");
-        Connection conn = getConnection();
+        Connection conn = createStartedConnection();
         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(null);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
         conn.start();
-        producer.send(session.createTopic("N.N.Y.Y"), 
session.createTextMessage("test1"));
+        producer.send(session.createTopic(String.format(_topicNameFormat, 
"N.N.Y.Y")), session.createTextMessage("test1"));
         session.commit();
-        MessageConsumer consumer = 
session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
-        Message msg = consumer.receive(1000l);
+        MessageConsumer consumer = 
session.createConsumer(_durableDefaultPersist);
+        Message msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test1", ((TextMessage)msg).getText());
         session.commit();
         System.gc();
-        consumer = 
session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
-        msg = consumer.receive(1000l);
+        consumer = session.createConsumer(_nonDurableAlwaysPersist);
+        msg = consumer.receive(getReceiveTimeout());
         assertNotNull(msg);
         assertTrue(msg instanceof TextMessage);
         assertEquals("test1", ((TextMessage)msg).getText());
         session.commit();
     }
 
+    private Connection createStartedConnection() throws JMSException, 
NamingException
+    {
+        Connection conn = getConnection();
+        conn.start();
+        return conn;
+    }
+
+    private Queue createQueueWithArguments(final Session session,
+                                           final String testQueueName,
+                                           final Map<String, Object> 
arguments) throws Exception
+    {
+        createEntityUsingAmqpManagement(testQueueName, session, 
"org.apache.qpid.Queue", arguments);
+        return getQueueFromName(session, testQueueName);
+    }
+
+    private void bindQueue(final Session session, final String exchange, final 
String queueName,
+                           final String bindingKey) throws Exception
+    {
+
+        final Map<String, Object> arguments = new HashMap<>();
+        arguments.put("destination", queueName);
+        arguments.put("bindingKey", bindingKey);
+        performOperationUsingAmqpManagement(exchange, "bind", session, 
"org.apache.qpid.TopicExchange", arguments);
+    }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to