QPID-6933: [System Tests] Refactor queue overflow policy tests as JMS 1.1 system test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cd432bcf Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cd432bcf Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cd432bcf Branch: refs/heads/master Commit: cd432bcf6cbebf189f884fbbd6c3e789a50ab7b8 Parents: 2925d8a Author: Alex Rudyy <[email protected]> Authored: Fri Dec 29 12:00:34 2017 +0000 Committer: Alex Rudyy <[email protected]> Committed: Fri Dec 29 12:00:34 2017 +0000 ---------------------------------------------------------------------- .../extensions/queue/QueuePolicyTest.java | 190 +++++++++++++++++++ .../qpid/server/queue/FlowToDiskTest.java | 90 --------- .../qpid/test/client/queue/QueuePolicyTest.java | 140 -------------- test-profiles/CPPExcludes | 2 - test-profiles/Java10UninvestigatedTestsExcludes | 2 - test-profiles/JavaTransientExcludes | 1 - 6 files changed, 190 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd432bcf/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/QueuePolicyTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/QueuePolicyTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/QueuePolicyTest.java new file mode 100644 index 0000000..56f49f7 --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/QueuePolicyTest.java @@ -0,0 +1,190 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.systests.jms_1_1.extensions.queue; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeThat; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.junit.Test; + +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.systests.JmsTestBase; + +public class QueuePolicyTest extends JmsTestBase +{ + + @Test + public void testRejectPolicyMessageDepth() throws Exception + { + Destination destination = createQueue(getTestName(), OverflowPolicy.REJECT, 5); + Connection connection = getConnectionBuilder().setSyncPublish(true).build(); + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < 5; i++) + { + producer.send(session.createMessage()); + session.commit(); + } + + try + { + producer.send(session.createMessage()); + session.commit(); + fail("The client did not receive an exception after exceeding the queue limit"); + } + catch (JMSException e) + { + // pass + } + } + finally + { + connection.close(); + } + + Connection secondConnection = getConnection(); + try + { + secondConnection.start(); + + Session secondSession = secondConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = secondSession.createConsumer(destination); + Message receivedMessage = consumer.receive(getReceiveTimeout()); + assertNotNull("Message is not received", receivedMessage); + secondSession.commit(); + + MessageProducer secondProducer = secondSession.createProducer(destination); + secondProducer.send(secondSession.createMessage()); + secondSession.commit(); + } + finally + { + secondConnection.close(); + } + } + + @Test + public void testRingPolicy() throws Exception + { + Destination destination = createQueue(getTestName(), OverflowPolicy.RING, 2); + Connection connection = getConnection(); + try + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage("Test1")); + producer.send(session.createTextMessage("Test2")); + producer.send(session.createTextMessage("Test3")); + + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + TextMessage receivedMessage = (TextMessage) consumer.receive(getReceiveTimeout()); + assertNotNull("The consumer should receive the receivedMessage with body='Test2'", receivedMessage); + assertEquals("Unexpected first message", "Test2", receivedMessage.getText()); + + receivedMessage = (TextMessage) consumer.receive(getReceiveTimeout()); + assertNotNull("The consumer should receive the receivedMessage with body='Test3'", receivedMessage); + assertEquals("Unexpected second message", "Test3", receivedMessage.getText()); + } + finally + { + connection.close(); + } + } + + @Test + public void testRoundtripWithFlowToDisk() throws Exception + { + assumeThat("Test requires persistent store", getBrokerAdmin().supportsRestart(), is(true)); + + String queueName = getTestName(); + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK.name()); + arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0L); + createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", arguments); + Queue queue = createQueue(queueName); + + Map<String, Object> statistics = getVirtualHostStatistics("bytesEvacuatedFromMemory"); + Long originalBytesEvacuatedFromMemory = (Long) statistics.get("bytesEvacuatedFromMemory"); + + Connection connection = getConnection(); + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + TextMessage message = session.createTextMessage("testMessage"); + MessageProducer producer = session.createProducer(queue); + producer.send(message); + session.commit(); + + // make sure we are flowing to disk + Map<String, Object> statistics2 = getVirtualHostStatistics("bytesEvacuatedFromMemory"); + Long bytesEvacuatedFromMemory = (Long) statistics2.get("bytesEvacuatedFromMemory"); + assertTrue("Message was not evacuated from memory", + bytesEvacuatedFromMemory > originalBytesEvacuatedFromMemory); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message receivedMessage = consumer.receive(getReceiveTimeout()); + assertNotNull("Did not receive message", receivedMessage); + assertThat("Unexpected message type", receivedMessage, is(instanceOf(TextMessage.class))); + assertEquals("Unexpected message content", message.getText(), ((TextMessage) receivedMessage).getText()); + } + finally + { + connection.close(); + } + } + + private Destination createQueue(final String queueName, OverflowPolicy overflowPolicy, int messageLimit) + throws Exception + { + final Map<String, Object> arguments = new HashMap<>(); + arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, overflowPolicy.name()); + arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, messageLimit); + createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", arguments); + return createQueue(queueName); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd432bcf/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java b/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java deleted file mode 100644 index 4527307..0000000 --- a/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.QpidException; -import org.apache.qpid.server.model.OverflowPolicy; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class FlowToDiskTest extends QpidBrokerTestCase -{ - private Session _session; - private Queue _queue; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - Connection connection = getConnection(); - connection.start(); - _session = connection.createSession(true, Session.SESSION_TRANSACTED); - - _queue = createQueue(); - } - - private Queue createQueue() throws QpidException, JMSException - { - final Map<String, Object> arguments = new HashMap<String, Object>(); - arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK.name()); - arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0L); - createEntityUsingAmqpManagement(getTestQueueName(), _session, "org.apache.qpid.Queue", arguments); - return getQueueFromName(_session, getTestQueueName()); - } - - public void testRoundtripWithFlowToDisk() throws Exception - { - Map<String, Object> statistics = (Map<String, Object>) performOperationUsingAmqpManagement("test", "getStatistics", _session, "org.apache.qpid.VirtualHost", Collections.singletonMap("statistics", Collections.singletonList("bytesEvacuatedFromMemory"))); - Long originalBytesEvacuatedFromMemory = (Long) statistics.get("bytesEvacuatedFromMemory"); - - TextMessage message = _session.createTextMessage("testMessage"); - MessageProducer producer = _session.createProducer(_queue); - producer.send(message); - _session.commit(); - - // make sure we are flowing to disk - Map<String, Object> statistics2 = (Map<String, Object>) performOperationUsingAmqpManagement("test", "getStatistics", _session, "org.apache.qpid.VirtualHost", Collections.singletonMap("statistics", Collections.singletonList("bytesEvacuatedFromMemory"))); - Long bytesEvacuatedFromMemory = (Long) statistics2.get("bytesEvacuatedFromMemory"); - assertTrue("Message was not evacuated from memory", bytesEvacuatedFromMemory > originalBytesEvacuatedFromMemory); - - MessageConsumer consumer = _session.createConsumer(_queue); - Message receivedMessage = consumer.receive(getReceiveTimeout()); - assertNotNull("Did not receive message", receivedMessage); - assertThat("Unexpected message type", receivedMessage, is(instanceOf(TextMessage.class))); - assertEquals("Unexpected message content", message.getText(), ((TextMessage) receivedMessage).getText()); - } -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd432bcf/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java b/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java deleted file mode 100644 index 2e689d9..0000000 --- a/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.test.client.queue; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.server.model.OverflowPolicy; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class QueuePolicyTest extends QpidBrokerTestCase -{ - private Connection _connection; - - @Override - public void setUp() throws Exception - { - super.setUp(); - _connection = getConnection(); - } - - public void testRejectPolicyMessageDepth() throws Exception - { - Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); - - Destination destination = createQueue(session, OverflowPolicy.REJECT, 5); - MessageProducer producer = session.createProducer(destination); - - for (int i = 0; i < 5; i++) - { - producer.send(session.createMessage()); - session.commit(); - } - - try - { - producer.send(session.createMessage()); - session.commit(); - fail("The client did not receive an exception after exceeding the queue limit"); - } - catch (JMSException e) - { - if (isJavaBroker()) - { - assertTrue("Unexpected exception: " + e.getMessage(), - e.getMessage().contains("Maximum depth exceeded")); - } - } - - Connection secondConnection = getConnection(); - secondConnection.start(); - - Session secondSession = secondConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = secondSession.createConsumer(destination); - Message receivedMessage = consumer.receive(getReceiveTimeout()); - assertNotNull("Message is not received", receivedMessage); - secondSession.commit(); - - MessageProducer secondProducer = secondSession.createProducer(destination); - secondProducer.send(secondSession.createMessage()); - secondSession.commit(); - } - - public void testRingPolicy() throws Exception - { - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination destination = createQueue(session, OverflowPolicy.RING, 2); - MessageProducer producer = session.createProducer(destination); - producer.send(session.createTextMessage("Test1")); - producer.send(session.createTextMessage("Test2")); - producer.send(session.createTextMessage("Test3")); - - MessageConsumer consumer = session.createConsumer(destination); - _connection.start(); - - TextMessage receivedMessage = (TextMessage) consumer.receive(getReceiveTimeout()); - assertNotNull("The consumer should receive the receivedMessage with body='Test2'", receivedMessage); - assertEquals("Unexpected first message", "Test2", receivedMessage.getText()); - - receivedMessage = (TextMessage) consumer.receive(getReceiveTimeout()); - assertNotNull("The consumer should receive the receivedMessage with body='Test3'", receivedMessage); - assertEquals("Unexpected second message", "Test3", receivedMessage.getText()); - } - - - private Destination createQueue(Session session, OverflowPolicy overflowPolicy, int msgLimit) - throws Exception - { - Destination destination; - String testQueueName = getTestQueueName(); - if (isBroker10()) - { - final Map<String, Object> arguments = new HashMap<>(); - arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, overflowPolicy.name()); - arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, msgLimit); - createEntityUsingAmqpManagement(testQueueName, session, "org.apache.qpid.Queue", arguments); - destination = getQueueFromName(session, testQueueName); - } - else - { - String address = String.format("ADDR: %s; {create: always, node: {" - + "x-bindings: [{exchange : 'amq.direct', key : %s}]," - + "x-declare:{arguments : {'qpid.policy_type': %s, 'qpid.max_count': %d}}" - + "}}", - testQueueName, - testQueueName, overflowPolicy.name().toLowerCase(), msgLimit); - destination = session.createQueue(address); - session.createConsumer(destination).close(); - } - return destination; - } -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd432bcf/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index 8fc0e03..1842076 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -180,8 +180,6 @@ org.apache.qpid.systests.jms_2_0.* # Exclude the AMQP 1.0 protocol test suite org.apache.qpid.tests.protocol.v1_0.* -org.apache.qpid.server.queue.FlowToDiskTest#* - # Tests require AMQP management org.apache.qpid.server.routing.AlternateBindingRoutingTest#* org.apache.qpid.server.queue.QueueDepthWithSelectorTest#test http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd432bcf/test-profiles/Java10UninvestigatedTestsExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/Java10UninvestigatedTestsExcludes b/test-profiles/Java10UninvestigatedTestsExcludes index 9dd40a0..d436d43 100644 --- a/test-profiles/Java10UninvestigatedTestsExcludes +++ b/test-profiles/Java10UninvestigatedTestsExcludes @@ -20,8 +20,6 @@ // This file should eventually be removed as all the systests are moved to either // working, defined as broken, or excluded as they test version specific functionality -QPID-XXXX: It could be a broker bug. The issue requires further investigation -org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicyMessageDepth http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd432bcf/test-profiles/JavaTransientExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/JavaTransientExcludes b/test-profiles/JavaTransientExcludes index 964aefd..ec48a4a 100644 --- a/test-profiles/JavaTransientExcludes +++ b/test-profiles/JavaTransientExcludes @@ -41,4 +41,3 @@ org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOn org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected org.apache.qpid.server.failover.FailoverMethodTest#testNoFailover -org.apache.qpid.server.queue.FlowToDiskTest#* --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
