QPID-6933: [System Tests] Refactor PersistentStoreTest as JMS 1.1 system test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ccf691b2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ccf691b2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ccf691b2 Branch: refs/heads/master Commit: ccf691b2caeb16e2f9421146cfefd3489a6bf5bf Parents: 835efa5 Author: Keith Wall <[email protected]> Authored: Thu Jan 11 17:03:57 2018 +0000 Committer: Keith Wall <[email protected]> Committed: Thu Jan 11 17:07:43 2018 +0000 ---------------------------------------------------------------------- .../persistence/PersistentMessagingTest.java | 265 +++++++++++++++++++ .../qpid/server/store/PersistentStoreTest.java | 219 --------------- test-profiles/CPPExcludes | 1 - test-profiles/JavaTransientExcludes | 1 - 4 files changed, 265 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java new file mode 100644 index 0000000..619d9d8 --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/persistence/PersistentMessagingTest.java @@ -0,0 +1,265 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systests.jms_1_1.persistence; + +import static javax.jms.DeliveryMode.NON_PERSISTENT; +import static javax.jms.DeliveryMode.PERSISTENT; +import static javax.jms.Session.CLIENT_ACKNOWLEDGE; +import static javax.jms.Session.SESSION_TRANSACTED; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.systests.JmsTestBase; + +public class PersistentMessagingTest extends JmsTestBase +{ + private static final int MSG_COUNT = 3; + private static final String INT_PROPERTY = "index"; + private static final String STRING_PROPERTY = "string"; + + @Before + public void setUp() throws Exception + { + assumeThat("Tests requires persistent store", getBrokerAdmin().supportsRestart(), is(true)); + } + + @Test + public void committedPersistentMessagesSurviveBrokerRestart() throws Exception + { + Queue queue = createQueue(getTestName()); + Connection sendingConnection = getConnection(); + List<Message> sentMessages = new ArrayList<>(); + try + { + Session session = sendingConnection.createSession(true, SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(queue); + + sentMessages.addAll(sendMessages(session, producer, PERSISTENT, 0, MSG_COUNT)); + sendMessages(session, producer, NON_PERSISTENT, MSG_COUNT, 1); + } + finally + { + sendingConnection.close(); + } + + getBrokerAdmin().restart(); + + verifyQueueContents(queue, sentMessages); + } + + @Test + public void uncommittedPersistentMessagesDoNotSurviveBrokerRestart() throws Exception + { + Queue queue = createQueue(getTestName()); + Connection sendingConnection = getConnection(); + try + { + Session session = sendingConnection.createSession(true, SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(queue); + + producer.send(session.createMessage()); + // do not commit + } + finally + { + sendingConnection.close(); + } + + getBrokerAdmin().restart(); + + Connection receivingConnection = getConnection(); + try + { + receivingConnection.start(); + Session session = receivingConnection.createSession(true, SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + + final Message unexpectedMessage = consumer.receiveNoWait(); + assertNull(String.format("Unexpected message [%s] received", unexpectedMessage), unexpectedMessage); + } + finally + { + receivingConnection.close(); + } + } + + @Test + public void transactedAcknowledgementPersistence() throws Exception + { + Queue queue = createQueue(getTestName()); + Connection initialConnection = getConnection(); + List<Message> remainingMessages = new ArrayList<>(); + try + { + initialConnection.start(); + Session session = initialConnection.createSession(true, SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(queue); + + final List<Message> initialMessage = sendMessages(session, producer, PERSISTENT, 0, 1); + remainingMessages.addAll(sendMessages(session, producer, PERSISTENT, 1, 1)); + + // Receive first message and commit + MessageConsumer consumer = session.createConsumer(queue); + receiveAndVerifyMessages(session, consumer, initialMessage); + // Receive second message but do not commit + final Message peek = consumer.receive(getReceiveTimeout()); + assertNotNull(peek); + } + finally + { + initialConnection.close(); + } + + getBrokerAdmin().restart(); + + verifyQueueContents(queue, remainingMessages); + } + + @Test + public void clientAckAcknowledgementPersistence() throws Exception + { + Queue queue = createQueue(getTestName()); + Connection initialConnection = getConnection(); + List<Message> remainingMessages = new ArrayList<>(); + try + { + initialConnection.start(); + Session publishingSession = initialConnection.createSession(true, SESSION_TRANSACTED); + MessageProducer producer = publishingSession.createProducer(queue); + + final List<Message> initialMessages = sendMessages(publishingSession, producer, PERSISTENT, 0, 1); + remainingMessages.addAll(sendMessages(publishingSession, producer, PERSISTENT, 1, 1)); + + Session consumingSession = initialConnection.createSession(false, CLIENT_ACKNOWLEDGE); + + // Receive first message and ack + MessageConsumer consumer = consumingSession.createConsumer(queue); + receiveAndVerifyMessages(consumingSession, consumer, initialMessages); + + // Receive second but do not ack + final Message peek = consumer.receive(getReceiveTimeout()); + assertNotNull(peek); + } + finally + { + initialConnection.close(); + } + + getBrokerAdmin().restart(); + + verifyQueueContents(queue, remainingMessages); + } + + private List<Message> sendMessages(Session session, MessageProducer producer, + final int deliveryMode, + final int startIndex, final int count) throws Exception + { + final List<Message> sentMessages = new ArrayList<>(); + for (int i = startIndex; i < startIndex + count; i++) + { + Message message = session.createTextMessage(UUID.randomUUID().toString()); + message.setIntProperty(INT_PROPERTY, i); + message.setStringProperty(STRING_PROPERTY, UUID.randomUUID().toString()); + + producer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + sentMessages.add(message); + } + + session.commit(); + return sentMessages; + } + + private void verifyQueueContents(final Queue queue, final List<Message> expectedMessages) throws Exception + { + Connection receivingConnection = getConnection(); + try + { + receivingConnection.start(); + Session session = receivingConnection.createSession(true, SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + + receiveAndVerifyMessages(session, consumer, expectedMessages); + + final Message unexpectedMessage = consumer.receiveNoWait(); + assertNull(String.format("Unexpected additional message [%s] received", unexpectedMessage), unexpectedMessage); + } + finally + { + receivingConnection.close(); + } + } + + private void receiveAndVerifyMessages(final Session session, + final MessageConsumer consumer, + final List<Message> expectedMessages) throws Exception + { + + for (Message expected : expectedMessages) + { + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message not received when expecting message %d", expected.getIntProperty(INT_PROPERTY)), received); + + assertTrue("Unexpected type", expected instanceof TextMessage); + assertEquals("Unexpected index", + expected.getIntProperty(INT_PROPERTY), + received.getIntProperty(INT_PROPERTY)); + assertEquals("Unexpected string property", + expected.getStringProperty(STRING_PROPERTY), + received.getStringProperty(STRING_PROPERTY)); + assertEquals("Unexpected message content", + ((TextMessage) expected).getText(), + ((TextMessage) received).getText()); + + final int acknowledgeMode = session.getAcknowledgeMode(); + if (acknowledgeMode == SESSION_TRANSACTED) + { + session.commit(); + } + else if (acknowledgeMode == CLIENT_ACKNOWLEDGE) + { + received.acknowledge(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java b/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java deleted file mode 100644 index ea621ac..0000000 --- a/systests/src/test/java/org/apache/qpid/server/store/PersistentStoreTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.store; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -/** - * TODO KW acknowledgement persistence - */ -public class PersistentStoreTest extends QpidBrokerTestCase -{ - private static final int NUM_MESSAGES = 100; - private Connection _con; - private Session _session; - private Destination _destination; - - @Override - public void setUp() throws Exception - { - super.setUp(); - _con = getConnection(); - } - - public void testCommittedMessagesSurviveBrokerNormalShutdown() throws Exception - { - sendAndCommitMessages(); - stopDefaultBroker(); - startDefaultBroker(); - confirmBrokerStillHasCommittedMessages(); - } - - public void testCommittedMessagesSurviveBrokerAbnormalShutdown() throws Exception - { - if (isInternalBroker()) - { - return; - } - - sendAndCommitMessages(); - killDefaultBroker(); - startDefaultBroker(); - confirmBrokerStillHasCommittedMessages(); - } - - public void testCommittedMessagesSurviveBrokerNormalShutdownMidTransaction() throws Exception - { - sendAndCommitMessages(); - sendMoreMessagesWithoutCommitting(); - stopDefaultBroker(); - startDefaultBroker(); - confirmBrokerStillHasCommittedMessages(); - } - - public void testCommittedMessagesSurviveBrokerAbnormalShutdownMidTransaction() throws Exception - { - if (isInternalBroker()) - { - return; - } - sendAndCommitMessages(); - sendMoreMessagesWithoutCommitting(); - killDefaultBroker(); - startDefaultBroker(); - confirmBrokerStillHasCommittedMessages(); - } - - public void testHeaderPersistence() throws Exception - { - String testQueueName = getTestQueueName(); - String replyToQueue = testQueueName + "_reply"; - _con.start(); - _session = _con.createSession(true, Session.SESSION_TRANSACTED); - _destination = createTestQueue(_session, testQueueName); - Destination replyTo = createTestQueue(_session, replyToQueue); - MessageConsumer consumer = _session.createConsumer(_destination); - MessageProducer producer = _session.createProducer(_destination); - - final long expiration = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1); - final int priority = 3; - final String propertyKey = "mystring"; - final String propertyValue = "string"; - - Message msg = _session.createMessage(); - msg.setStringProperty(propertyKey, propertyValue); - msg.setJMSExpiration(expiration); - msg.setJMSReplyTo(replyTo); - - producer.send(msg, DeliveryMode.PERSISTENT, priority, expiration); - _session.commit(); - - final String sentMessageId = msg.getJMSMessageID(); - - Message receivedMessage = consumer.receive(getReceiveTimeout()); - long receivedJmsExpiration = receivedMessage.getJMSExpiration(); - assertEquals("Unexpected JMS message id", sentMessageId, receivedMessage.getJMSMessageID()); - assertEquals("Unexpected JMS replyto", replyTo, receivedMessage.getJMSReplyTo()); - assertEquals("Unexpected JMS priority", priority, receivedMessage.getJMSPriority()); - assertTrue("Expecting expiration to be in the future", receivedJmsExpiration > 0); - assertTrue("Expecting user property to be present", receivedMessage.propertyExists(propertyKey)); - assertEquals("Unexpected user property", propertyValue, receivedMessage.getStringProperty(propertyKey)); - // Do not commit message so we can re-receive after Broker restart - - stopDefaultBroker(); - startDefaultBroker(); - - _con = getConnection(); - _con.start(); - _session = _con.createSession(true, Session.SESSION_TRANSACTED); - consumer = _session.createConsumer(_destination); - - Message rereceivedMessage = consumer.receive(getReceiveTimeout()); - assertEquals("Unexpected JMS message id", sentMessageId, rereceivedMessage.getJMSMessageID()); - assertEquals("Unexpected JMS replyto", replyTo, rereceivedMessage.getJMSReplyTo()); - assertEquals("Unexpected JMS priority", priority, rereceivedMessage.getJMSPriority()); - assertEquals("Expecting expiration to be unchanged", receivedJmsExpiration, rereceivedMessage.getJMSExpiration()); - assertTrue("Expecting user property to be present", rereceivedMessage.propertyExists(propertyKey)); - assertEquals("Unexpected user property", propertyValue, rereceivedMessage.getStringProperty(propertyKey)); - _session.commit(); - } - - private void sendAndCommitMessages() throws Exception - { - _session = _con.createSession(true, Session.SESSION_TRANSACTED); - _destination = createTestQueue(_session); - - sendMessage(_session, _destination, NUM_MESSAGES); - _session.commit(); - } - - private void sendMoreMessagesWithoutCommitting() throws Exception - { - sendMessage(_session, _destination, NUM_MESSAGES); - } - - private void confirmBrokerStillHasCommittedMessages() throws Exception - { - Connection con = getConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - con.start(); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - for (int i = 1; i <= NUM_MESSAGES; i++) - { - Message msg = consumer.receive(getReceiveTimeout()); - assertNotNull("Message " + i + " not received", msg); - assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); - } - - Message msg = consumer.receive(getShortReceiveTimeout()); - if(msg != null) - { - fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); - } - } - - /** - * This test requires that we can send messages without committing. - * QTC always commits the messages sent via sendMessages. - * - * @param session the session to use for sending - * @param destination where to send them to - * @param count no. of messages to send - * - * @return the sent messages - * - * @throws Exception - */ - @Override - public List<Message> sendMessage(Session session, Destination destination, - int count) throws Exception - { - List<Message> messages = new ArrayList<>(count); - - MessageProducer producer = session.createProducer(destination); - - for (int i = 1; i <= count; i++) - { - Message next = createNextMessage(session, i); - - producer.send(next); - - messages.add(next); - } - - return messages; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index 2d10c6c..9873099 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -47,7 +47,6 @@ org.apache.qpid.test.unit.close.FlowToDiskBackingQueueDeleteTest#* org.apache.qpid.server.AlertingTest#* // The C++ server has a totally different persistence mechanism -org.apache.qpid.server.store.PersistentStoreTest#* org.apache.qpid.server.store.SplitStoreTest#* // CPP Broker does not follow the same Logging convention as the Qpid Broker-J http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ccf691b2/test-profiles/JavaTransientExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/JavaTransientExcludes b/test-profiles/JavaTransientExcludes index 66cd5e8..d4052de 100644 --- a/test-profiles/JavaTransientExcludes +++ b/test-profiles/JavaTransientExcludes @@ -18,7 +18,6 @@ // //These tests require a persistent store -org.apache.qpid.server.store.PersistentStoreTest#* org.apache.qpid.server.store.SplitStoreTest#* org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithRestart org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithChanges --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
