QPID-6933: [System Tests] Refactor PersistentStoreTest as JMS 1.1 system test


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ccf691b2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ccf691b2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ccf691b2

Branch: refs/heads/master
Commit: ccf691b2caeb16e2f9421146cfefd3489a6bf5bf
Parents: 835efa5
Author: Keith Wall <[email protected]>
Authored: Thu Jan 11 17:03:57 2018 +0000
Committer: Keith Wall <[email protected]>
Committed: Thu Jan 11 17:07:43 2018 +0000

----------------------------------------------------------------------
 .../persistence/PersistentMessagingTest.java    | 265 +++++++++++++++++++
 .../qpid/server/store/PersistentStoreTest.java  | 219 ---------------
 test-profiles/CPPExcludes                       |   1 -
 test-profiles/JavaTransientExcludes             |   1 -
 4 files changed, 265 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java
 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java
new file mode 100644
index 0000000..619d9d8
--- /dev/null
+++ 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.jms_1_1.persistence;
+
+import static javax.jms.DeliveryMode.NON_PERSISTENT;
+import static javax.jms.DeliveryMode.PERSISTENT;
+import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
+import static javax.jms.Session.SESSION_TRANSACTED;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+
+public class PersistentMessagingTest extends JmsTestBase
+{
+    private static final int MSG_COUNT = 3;
+    private static final String INT_PROPERTY = "index";
+    private static final String STRING_PROPERTY = "string";
+
+    @Before
+    public void setUp() throws Exception
+    {
+        assumeThat("Tests requires persistent store", 
getBrokerAdmin().supportsRestart(), is(true));
+    }
+
+    @Test
+    public void committedPersistentMessagesSurviveBrokerRestart() throws 
Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection sendingConnection = getConnection();
+        List<Message> sentMessages = new ArrayList<>();
+        try
+        {
+            Session session = sendingConnection.createSession(true, 
SESSION_TRANSACTED);
+            MessageProducer producer = session.createProducer(queue);
+
+            sentMessages.addAll(sendMessages(session, producer, PERSISTENT, 0, 
MSG_COUNT));
+            sendMessages(session, producer, NON_PERSISTENT, MSG_COUNT, 1);
+        }
+        finally
+        {
+            sendingConnection.close();
+        }
+
+        getBrokerAdmin().restart();
+
+        verifyQueueContents(queue, sentMessages);
+    }
+
+    @Test
+    public void uncommittedPersistentMessagesDoNotSurviveBrokerRestart() 
throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection sendingConnection = getConnection();
+        try
+        {
+            Session session = sendingConnection.createSession(true, 
SESSION_TRANSACTED);
+            MessageProducer producer = session.createProducer(queue);
+
+            producer.send(session.createMessage());
+            // do not commit
+        }
+        finally
+        {
+            sendingConnection.close();
+        }
+
+        getBrokerAdmin().restart();
+
+        Connection receivingConnection = getConnection();
+        try
+        {
+            receivingConnection.start();
+            Session session = receivingConnection.createSession(true, 
SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            final Message unexpectedMessage = consumer.receiveNoWait();
+            assertNull(String.format("Unexpected message [%s] received", 
unexpectedMessage), unexpectedMessage);
+        }
+        finally
+        {
+            receivingConnection.close();
+        }
+    }
+
+    @Test
+    public void transactedAcknowledgementPersistence() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection initialConnection = getConnection();
+        List<Message> remainingMessages = new ArrayList<>();
+        try
+        {
+            initialConnection.start();
+            Session session = initialConnection.createSession(true, 
SESSION_TRANSACTED);
+            MessageProducer producer = session.createProducer(queue);
+
+            final List<Message> initialMessage = sendMessages(session, 
producer, PERSISTENT, 0, 1);
+            remainingMessages.addAll(sendMessages(session, producer, 
PERSISTENT, 1, 1));
+
+            // Receive first message and commit
+            MessageConsumer consumer = session.createConsumer(queue);
+            receiveAndVerifyMessages(session, consumer, initialMessage);
+            // Receive second message but do not commit
+            final Message peek = consumer.receive(getReceiveTimeout());
+            assertNotNull(peek);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
+
+        getBrokerAdmin().restart();
+
+        verifyQueueContents(queue, remainingMessages);
+    }
+
+    @Test
+    public void clientAckAcknowledgementPersistence() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection initialConnection = getConnection();
+        List<Message> remainingMessages = new ArrayList<>();
+        try
+        {
+            initialConnection.start();
+            Session publishingSession = initialConnection.createSession(true, 
SESSION_TRANSACTED);
+            MessageProducer producer = publishingSession.createProducer(queue);
+
+            final List<Message> initialMessages = 
sendMessages(publishingSession, producer, PERSISTENT, 0, 1);
+            remainingMessages.addAll(sendMessages(publishingSession, producer, 
PERSISTENT, 1, 1));
+
+            Session consumingSession = initialConnection.createSession(false, 
CLIENT_ACKNOWLEDGE);
+
+            // Receive first message and ack
+            MessageConsumer consumer = consumingSession.createConsumer(queue);
+            receiveAndVerifyMessages(consumingSession, consumer, 
initialMessages);
+
+            // Receive second but do not ack
+            final Message peek = consumer.receive(getReceiveTimeout());
+            assertNotNull(peek);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
+
+        getBrokerAdmin().restart();
+
+        verifyQueueContents(queue, remainingMessages);
+    }
+
+    private List<Message> sendMessages(Session session, MessageProducer 
producer,
+                                       final int deliveryMode,
+                                       final int startIndex, final int count) 
throws Exception
+    {
+        final List<Message> sentMessages = new ArrayList<>();
+        for (int i = startIndex; i < startIndex + count; i++)
+        {
+            Message message = 
session.createTextMessage(UUID.randomUUID().toString());
+            message.setIntProperty(INT_PROPERTY, i);
+            message.setStringProperty(STRING_PROPERTY, 
UUID.randomUUID().toString());
+
+            producer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, 
Message.DEFAULT_TIME_TO_LIVE);
+            sentMessages.add(message);
+        }
+
+        session.commit();
+        return sentMessages;
+    }
+
+    private void verifyQueueContents(final Queue queue, final List<Message> 
expectedMessages) throws Exception
+    {
+        Connection receivingConnection = getConnection();
+        try
+        {
+            receivingConnection.start();
+            Session session = receivingConnection.createSession(true, 
SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            receiveAndVerifyMessages(session, consumer, expectedMessages);
+
+            final Message unexpectedMessage = consumer.receiveNoWait();
+            assertNull(String.format("Unexpected additional message [%s] 
received", unexpectedMessage), unexpectedMessage);
+        }
+        finally
+        {
+            receivingConnection.close();
+        }
+    }
+
+    private void receiveAndVerifyMessages(final Session session,
+                                          final MessageConsumer consumer,
+                                          final List<Message> 
expectedMessages) throws Exception
+    {
+
+        for (Message expected : expectedMessages)
+        {
+            final Message received = consumer.receive(getReceiveTimeout());
+            assertNotNull(String.format("Message not received when expecting 
message %d", expected.getIntProperty(INT_PROPERTY)), received);
+
+            assertTrue("Unexpected type", expected instanceof TextMessage);
+            assertEquals("Unexpected index",
+                         expected.getIntProperty(INT_PROPERTY),
+                         received.getIntProperty(INT_PROPERTY));
+            assertEquals("Unexpected string property",
+                         expected.getStringProperty(STRING_PROPERTY),
+                         received.getStringProperty(STRING_PROPERTY));
+            assertEquals("Unexpected message content",
+                         ((TextMessage) expected).getText(),
+                         ((TextMessage) received).getText());
+
+            final int acknowledgeMode = session.getAcknowledgeMode();
+            if (acknowledgeMode == SESSION_TRANSACTED)
+            {
+                session.commit();
+            }
+            else if (acknowledgeMode == CLIENT_ACKNOWLEDGE)
+            {
+                received.acknowledge();
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java
----------------------------------------------------------------------
diff --git 
a/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java 
b/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java
deleted file mode 100644
index ea621ac..0000000
--- 
a/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * TODO KW acknowledgement persistence
- */
-public class PersistentStoreTest extends QpidBrokerTestCase
-{
-    private static final int NUM_MESSAGES = 100;
-    private Connection _con;
-    private Session _session;
-    private Destination _destination;
-
-    @Override
-    public void setUp() throws Exception
-    {
-        super.setUp();
-        _con = getConnection();
-    }
-
-    public void testCommittedMessagesSurviveBrokerNormalShutdown() throws 
Exception
-    {
-        sendAndCommitMessages();
-        stopDefaultBroker();
-        startDefaultBroker();
-        confirmBrokerStillHasCommittedMessages();
-    }
-
-    public void testCommittedMessagesSurviveBrokerAbnormalShutdown() throws 
Exception
-    {
-        if (isInternalBroker())
-        {
-            return;
-        }
-
-        sendAndCommitMessages();
-        killDefaultBroker();
-        startDefaultBroker();
-        confirmBrokerStillHasCommittedMessages();
-    }
-
-    public void 
testCommittedMessagesSurviveBrokerNormalShutdownMidTransaction() throws 
Exception
-    {
-        sendAndCommitMessages();
-        sendMoreMessagesWithoutCommitting();
-        stopDefaultBroker();
-        startDefaultBroker();
-        confirmBrokerStillHasCommittedMessages();
-    }
-
-    public void 
testCommittedMessagesSurviveBrokerAbnormalShutdownMidTransaction() throws 
Exception
-    {
-        if (isInternalBroker())
-        {
-            return;
-        }
-        sendAndCommitMessages();
-        sendMoreMessagesWithoutCommitting();
-        killDefaultBroker();
-        startDefaultBroker();
-        confirmBrokerStillHasCommittedMessages();
-    }
-
-    public void testHeaderPersistence() throws Exception
-    {
-        String testQueueName = getTestQueueName();
-        String replyToQueue = testQueueName + "_reply";
-        _con.start();
-        _session = _con.createSession(true, Session.SESSION_TRANSACTED);
-        _destination = createTestQueue(_session, testQueueName);
-        Destination replyTo = createTestQueue(_session, replyToQueue);
-        MessageConsumer consumer = _session.createConsumer(_destination);
-        MessageProducer producer = _session.createProducer(_destination);
-
-        final long expiration = System.currentTimeMillis() + 
TimeUnit.HOURS.toMillis(1);
-        final int priority = 3;
-        final String propertyKey = "mystring";
-        final String propertyValue = "string";
-
-        Message msg = _session.createMessage();
-        msg.setStringProperty(propertyKey, propertyValue);
-        msg.setJMSExpiration(expiration);
-        msg.setJMSReplyTo(replyTo);
-
-        producer.send(msg, DeliveryMode.PERSISTENT, priority, expiration);
-        _session.commit();
-
-        final String sentMessageId = msg.getJMSMessageID();
-
-        Message receivedMessage = consumer.receive(getReceiveTimeout());
-        long receivedJmsExpiration = receivedMessage.getJMSExpiration();
-        assertEquals("Unexpected JMS message id", sentMessageId, 
receivedMessage.getJMSMessageID());
-        assertEquals("Unexpected JMS replyto", replyTo, 
receivedMessage.getJMSReplyTo());
-        assertEquals("Unexpected JMS priority", priority, 
receivedMessage.getJMSPriority());
-        assertTrue("Expecting expiration to be in the future", 
receivedJmsExpiration > 0);
-        assertTrue("Expecting user property to be present", 
receivedMessage.propertyExists(propertyKey));
-        assertEquals("Unexpected user property", propertyValue, 
receivedMessage.getStringProperty(propertyKey));
-        // Do not commit message so we can re-receive after Broker restart
-
-        stopDefaultBroker();
-        startDefaultBroker();
-
-        _con = getConnection();
-        _con.start();
-        _session = _con.createSession(true, Session.SESSION_TRANSACTED);
-        consumer = _session.createConsumer(_destination);
-
-        Message rereceivedMessage = consumer.receive(getReceiveTimeout());
-        assertEquals("Unexpected JMS message id", sentMessageId, 
rereceivedMessage.getJMSMessageID());
-        assertEquals("Unexpected JMS replyto", replyTo, 
rereceivedMessage.getJMSReplyTo());
-        assertEquals("Unexpected JMS priority", priority, 
rereceivedMessage.getJMSPriority());
-        assertEquals("Expecting expiration to be unchanged", 
receivedJmsExpiration, rereceivedMessage.getJMSExpiration());
-        assertTrue("Expecting user property to be present", 
rereceivedMessage.propertyExists(propertyKey));
-        assertEquals("Unexpected user property", propertyValue, 
rereceivedMessage.getStringProperty(propertyKey));
-        _session.commit();
-    }
-
-    private void sendAndCommitMessages() throws Exception
-    {
-        _session = _con.createSession(true, Session.SESSION_TRANSACTED);
-        _destination = createTestQueue(_session);
-
-        sendMessage(_session, _destination, NUM_MESSAGES);
-        _session.commit();
-    }
-
-    private void sendMoreMessagesWithoutCommitting() throws Exception
-    {
-        sendMessage(_session, _destination, NUM_MESSAGES);
-    }
-
-    private void confirmBrokerStillHasCommittedMessages() throws Exception
-    {
-        Connection con = getConnection();
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        con.start();
-        Destination destination = session.createQueue(getTestQueueName());
-        MessageConsumer consumer = session.createConsumer(destination);
-        for (int i = 1; i <= NUM_MESSAGES; i++)
-        {
-            Message msg = consumer.receive(getReceiveTimeout());
-            assertNotNull("Message " + i + " not received", msg);
-            assertEquals("Did not receive the expected message", i, 
msg.getIntProperty(INDEX));
-        }
-
-        Message msg = consumer.receive(getShortReceiveTimeout());
-        if(msg != null)
-        {
-            fail("No more messages should be received, but received additional 
message with index: " + msg.getIntProperty(INDEX));
-        }
-    }
-
-    /**
-     * This test requires that we can send messages without committing.
-     * QTC always commits the messages sent via sendMessages.
-     *
-     * @param session the session to use for sending
-     * @param destination where to send them to
-     * @param count no. of messages to send
-     *
-     * @return the sent messages
-     *
-     * @throws Exception
-     */
-    @Override
-    public List<Message> sendMessage(Session session, Destination destination,
-                                     int count) throws Exception
-    {
-        List<Message> messages = new ArrayList<>(count);
-
-        MessageProducer producer = session.createProducer(destination);
-
-        for (int i = 1; i <= count; i++)
-        {
-            Message next = createNextMessage(session, i);
-
-            producer.send(next);
-
-            messages.add(next);
-        }
-
-        return messages;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 2d10c6c..9873099 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -47,7 +47,6 @@ 
org.apache.qpid.test.unit.close.FlowToDiskBackingQueueDeleteTest#*
 org.apache.qpid.server.AlertingTest#*
 
 // The C++ server has a totally different persistence mechanism
-org.apache.qpid.server.store.PersistentStoreTest#*
 org.apache.qpid.server.store.SplitStoreTest#*
 
 // CPP Broker does not follow the same Logging convention as the Qpid Broker-J

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/test-profiles/JavaTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaTransientExcludes 
b/test-profiles/JavaTransientExcludes
index 66cd5e8..d4052de 100644
--- a/test-profiles/JavaTransientExcludes
+++ b/test-profiles/JavaTransientExcludes
@@ -18,7 +18,6 @@
 //
 
 //These tests require a persistent store
-org.apache.qpid.server.store.PersistentStoreTest#*
 org.apache.qpid.server.store.SplitStoreTest#*
 org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithRestart
 org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithChanges


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to