QPID-6933: [System Tests] Refactor Prefetch tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/37365f91 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/37365f91 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/37365f91 Branch: refs/heads/master Commit: 37365f91801ffbf070f0580fa7fda7ec91f66fd0 Parents: 988006b Author: Keith Wall <[email protected]> Authored: Mon Dec 25 17:36:14 2017 +0000 Committer: Keith Wall <[email protected]> Committed: Tue Dec 26 18:11:18 2017 +0000 ---------------------------------------------------------------------- .../java/org/apache/qpid/systests/Utils.java | 4 +- .../extensions/prefetch/PrefetchTest.java | 254 ++++++++++++++++ .../client/prefetch/PrefetchBehaviourTest.java | 297 ------------------- .../qpid/systest/prefetch/ZeroPrefetchTest.java | 81 ----- test-profiles/CPPExcludes | 2 - test-profiles/Java010Excludes | 2 - test-profiles/Java10UninvestigatedTestsExcludes | 4 +- test-profiles/JavaPre010Excludes | 3 - 8 files changed, 257 insertions(+), 390 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java index c2dfe75..01e75f1 100644 --- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java +++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java @@ -52,13 +52,13 @@ public class Utils } } - public static void sendMessages(final Connection connection, final Destination destination, final int messageNumber) + public static void sendMessages(final Connection connection, final Destination destination, final int count) throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { - sendMessages(session, destination, messageNumber); + sendMessages(session, destination, count); } finally { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java new file mode 100644 index 0000000..728148e --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java @@ -0,0 +1,254 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systests.jms_1_1.extensions.prefetch; + +import static junit.framework.TestCase.assertEquals; +import static org.apache.qpid.server.model.Protocol.AMQP_0_8; +import static org.apache.qpid.server.model.Protocol.AMQP_0_9; +import static org.apache.qpid.server.model.Protocol.AMQP_0_9_1; +import static org.apache.qpid.systests.Utils.INDEX; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assume.assumeThat; + +import java.util.EnumSet; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.junit.Test; + +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.systests.JmsTestBase; +import org.apache.qpid.systests.Utils; + +public class PrefetchTest extends JmsTestBase +{ + private static final EnumSet<Protocol> PRE_010_PROTOCOLS = EnumSet.of(AMQP_0_8, AMQP_0_9, AMQP_0_9_1); + + @Test + public void prefetch() throws Exception + { + Connection connection1 = getConnectionBuilder().setPrefetch(3).build(); + Queue queue = createQueue(getTestName()); + try + { + connection1.start(); + + final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session1.createConsumer(queue); + + Utils.sendMessages(connection1, queue, 6); + + final Message receivedMessage = consumer1.receive(getReceiveTimeout()); + assertNotNull("First message was not received", receivedMessage); + assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX)); + + forceSync(session1); + + observeNextAvailableMessage(queue, 4); + } + finally + { + connection1.close(); + } + } + + /** + * send two messages to the queue, consume and acknowledge one message on connection 1 + * create a second connection and attempt to consume the second message - this will only be possible + * if the first connection has no prefetch + */ + @Test + public void prefetchDisabled() throws Exception + { + Connection connection1 = getConnectionBuilder().setPrefetch(0).build(); + Queue queue = createQueue(getTestName()); + try + { + connection1.start(); + + final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session1.createConsumer(queue); + + Utils.sendMessages(connection1, queue, 2); + + final Message receivedMessage = consumer1.receive(getReceiveTimeout()); + assertNotNull("First message was not received", receivedMessage); + assertEquals("Message property was not as expected", 0, receivedMessage.getIntProperty(INDEX)); + + observeNextAvailableMessage(queue, 1); + } + finally + { + connection1.close(); + } + } + + @Test + public void connectionStopReleasesPrefetchedMessages() throws Exception + { + assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10))); + + Connection connection1 = getConnectionBuilder().setPrefetch(3).build(); + Queue queue = createQueue(getTestName()); + try + { + connection1.start(); + + final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session1.createConsumer(queue); + + Utils.sendMessages(connection1, queue, 6); + + final Message receivedMessage = consumer1.receive(getReceiveTimeout()); + assertNotNull("First message was not received", receivedMessage); + assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX)); + + forceSync(session1); + + connection1.stop(); + + observeNextAvailableMessage(queue, 1); + } + finally + { + connection1.close(); + } + } + + @Test + public void consumerCloseReleasesPrefetchedMessages() throws Exception + { + assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10))); + + Connection connection1 = getConnectionBuilder().setPrefetch(3).build(); + Queue queue = createQueue(getTestName()); + try + { + connection1.start(); + + final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session1.createConsumer(queue); + + Utils.sendMessages(connection1, queue, 6); + + final Message receivedMessage = consumer1.receive(getReceiveTimeout()); + assertNotNull("First message was not received", receivedMessage); + assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX)); + + forceSync(session1); + + consumer1.close(); + + observeNextAvailableMessage(queue, 1); + } + finally + { + connection1.close(); + } + } + + @Test + public void consumeBeyondPrefetch() throws Exception + { + Connection connection1 = getConnectionBuilder().setPrefetch(1).build(); + Queue queue = createQueue(getTestName()); + try + { + connection1.start(); + + final Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer1 = session1.createConsumer(queue); + + Utils.sendMessages(connection1, queue, 5); + + Message message = consumer1.receive(getReceiveTimeout()); + assertNotNull(message); + assertEquals(0, message.getIntProperty(INDEX)); + + message = consumer1.receive(getReceiveTimeout()); + assertNotNull(message); + assertEquals(1, message.getIntProperty(INDEX)); + message = consumer1.receive(getReceiveTimeout()); + assertNotNull(message); + assertEquals(2, message.getIntProperty(INDEX)); + + forceSync(session1); + + // In pre 0-10, in a transaction session the client does not ack the message until the commit occurs + // so the message observed by another connection will have the index 3 rather than 4. + Connection connection2 = getConnection(); + try + { + Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer2 = session2.createConsumer(queue); + connection2.start(); + + message = consumer2.receive(getReceiveTimeout()); + assertNotNull(message); + assertEquals("Received message has unexpected index", + PRE_010_PROTOCOLS.contains(getProtocol()) ? 3 : 4, + message.getIntProperty(INDEX)); + + session2.rollback(); + } + finally + { + connection2.close(); + } + } + finally + { + connection1.close(); + } + } + + private void observeNextAvailableMessage(final Queue queue, final int expectedIndex) throws JMSException, NamingException + { + Connection connection2 = getConnection(); + try + { + connection2.start(); + final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(queue); + + final Message receivedMessage2 = consumer2.receive(getReceiveTimeout()); + assertNotNull("Observer connection did not receive message", receivedMessage2); + assertEquals("Message received by the observer connection has unexpected index", expectedIndex, receivedMessage2.getIntProperty(INDEX)); + } + finally + { + connection2.close(); + } + } + + private void forceSync(final Session session1) throws Exception + { + session1.createTemporaryQueue().delete(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java deleted file mode 100644 index 330012f..0000000 --- a/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ /dev/null @@ -1,297 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.client.prefetch; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - - -public class PrefetchBehaviourTest extends QpidBrokerTestCase -{ - private static final Logger LOGGER = LoggerFactory.getLogger(PrefetchBehaviourTest.class); - private Connection _normalConnection; - private AtomicBoolean _exceptionCaught; - private CountDownLatch _processingStarted; - private CountDownLatch _processingCompleted; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - _normalConnection = getConnection(); - _exceptionCaught = new AtomicBoolean(); - _processingStarted = new CountDownLatch(1); - _processingCompleted = new CountDownLatch(1); - } - - /** - * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only - * gets 1 of the messages sent, with the second consumer picking up the others while the - * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin. - */ - public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception - { - final long processingTime = 5000; - - //create a second connection with prefetch set to 1 - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); - Connection prefetch1Connection = getConnection(); - - prefetch1Connection.start(); - _normalConnection.start(); - - //create an asynchronous consumer with simulated slow processing - final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = prefetch1session.createQueue(getTestQueueName()); - MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue); - prefetch1consumer.setMessageListener(new MessageListener() - { - @Override - public void onMessage(Message message) - { - try - { - LOGGER.debug("starting processing"); - _processingStarted.countDown(); - LOGGER.debug("processing started"); - - //simulate message processing - Thread.sleep(processingTime); - - prefetch1session.commit(); - - _processingCompleted.countDown(); - } - catch(Exception e) - { - LOGGER.error("Exception caught in message listener"); - _exceptionCaught.set(true); - } - } - }); - - //create producer and send 5 messages - Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(queue); - - for (int i = 0; i < 5; i++) - { - producer.send(producerSession.createTextMessage("test")); - } - producerSession.commit(); - - //wait for the first message to start being processed by the async consumer - assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS)); - LOGGER.debug("proceeding with test"); - - //try to consumer the other messages with another consumer while the async procesisng occurs - Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageConsumer normalConsumer = normalSession.createConsumer(queue); - - Message msg; - // Check that other consumer gets the other 4 messages - for (int i = 0; i < 4; i++) - { - msg = normalConsumer.receive(1500); - assertNotNull("Consumer should receive 4 messages",msg); - } - msg = normalConsumer.receive(250); - assertNull("Consumer should not have received a 5th message",msg); - - //wait for the other consumer to finish to ensure it completes ok - LOGGER.debug("waiting for async consumer to complete"); - assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS)); - assertFalse("Unexpected exception during async message processing",_exceptionCaught.get()); - } - - /** - * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty. - * - */ - public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception - { - // This test is flaky. There is no guarantee that the messages have been sent to - // consumerA's prefetch buffer by the time consumerB calls receive(). - Queue queue = getTestQueue(); - - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); - - Connection connection = getConnection(); - connection.start(); - // Create Consumer A - Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumerA = consSessA.createConsumer(queue); - - // ensure message delivery to consumer A is started (required for 0-8..0-9-1) - final Message msg = consumerA.receiveNoWait(); - assertNull(msg); - - Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); - sendMessage(producerSession, queue, 3); - - // Create Consumer B - MessageConsumer consumerB = null; - if (isBroker010()) - { - // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A - consumerB = consSessA.createConsumer(queue); - } - else - { - // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session - Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumerB = consSessB.createConsumer(queue); - } - - // As message delivery to consumer A is already started, the first two messages should - // now be with consumer A. The last message will still be on the Broker as consumer A's - // credit is exhausted and message delivery for consumer B is not yet running. - - // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A. - // If we were to reverse the order, the SessionComplete will restore Consumer A's credit, - // and the third message could be delivered to either Consumer A or Consumer B. - - // Check that consumer B gets the last (third) message. - final Message msgConsumerB = consumerB.receive(1500); - assertNotNull("Consumer B should have received a message", msgConsumerB); - assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX)); - - // Now check that consumer A has indeed got the first two messages. - for (int i = 0; i < 2; i++) - { - final Message msgConsumerA = consumerA.receive(1500); - assertNotNull("Consumer A should have received a message " + i, msgConsumerA); - assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX)); - } - } - - /** - * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. - * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. - * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. - * Try to receive all 10 messages. - */ - public void testConnectionStop() throws Exception - { - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); - Connection con = getConnection(); - con.start(); - Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); - - MessageProducer prod = ssn.createProducer(queue); - for (int i=0; i<10;i++) - { - prod.send(ssn.createTextMessage("Msg" + i)); - } - - MessageConsumer consumer = ssn.createConsumer(queue); - // This is to ensure we get the first client to prefetch. - Message msg = consumer.receive(1000); - assertNotNull("The first consumer should get one message",msg); - con.stop(); - - Connection con2 = getConnection(); - con2.start(); - Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = ssn2.createConsumer(queue); - for (int i=0; i<9;i++) - { - TextMessage m = (TextMessage)consumer2.receive(1000); - assertNotNull("The second consumer should get 9 messages, but received only " + i,m); - } - } - - public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception - { - - _normalConnection.start(); - - //create a second connection with prefetch set to 1 - - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); - Connection prefetch1Connection = getConnection(); - Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName())); - - - Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = producerSession.createQueue(getTestQueueName()); - MessageProducer producer = producerSession.createProducer(queue); - - for (int i = 0; i < 5; i++) - { - producer.send(producerSession.createTextMessage("test")); - } - producerSession.commit(); - - - prefetch1Connection.start(); - - - - Message message = consumer.receive(1000l); - assertNotNull(message); - message = consumer.receive(1000l); - assertNotNull(message); - message = consumer.receive(1000l); - assertNotNull(message); - - - Connection secondConsumerConnection = getConnection(); - Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName())); - secondConsumerConnection.start(); - - message = secondConsumer.receive(1000l); - assertNotNull(message); - - message = secondConsumer.receive(1000l); - assertNotNull(message); - - consumerSession.commit(); - secondConsumerSession.commit(); - - message = consumer.receive(1000l); - assertNull(message); - - } - - -} - http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java deleted file mode 100644 index e313222..0000000 --- a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest.prefetch; - -import java.util.UUID; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class ZeroPrefetchTest extends QpidBrokerTestCase -{ - - private static final String TEST_PROPERTY_NAME = "testProp"; - - // send two messages to the queue, consume and acknowledge one message on connection 1 - // create a second connection and attempt to consume the second message - this will only be possible - // if the first connection has no prefetch - public void testZeroPrefetch() throws Exception - { - Connection prefetch1Connection = getConnectionWithPrefetch(0); - - prefetch1Connection.start(); - - final Session prefetch1session = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = createTestQueue(prefetch1session); - MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue); - - - Session producerSession = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - Message firstMessage = producerSession.createMessage(); - String firstPropertyValue = UUID.randomUUID().toString(); - firstMessage.setStringProperty(TEST_PROPERTY_NAME, firstPropertyValue); - producer.send(firstMessage); - - Message secondMessage = producerSession.createMessage(); - String secondPropertyValue = UUID.randomUUID().toString(); - secondMessage.setStringProperty(TEST_PROPERTY_NAME, secondPropertyValue); - producer.send(secondMessage); - - - final Message receivedMessage = prefetch1consumer.receive(2000l); - assertNotNull("First message was not received", receivedMessage); - assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME)); - - Connection prefetch2Connection = getConnectionWithPrefetch(0); - - prefetch2Connection.start(); - final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue); - - final Message receivedMessage2 = prefetch2consumer.receive(2000l); - assertNotNull("Second message was not received", receivedMessage2); - assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage2.getStringProperty(TEST_PROPERTY_NAME)); - - } -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index 3c1ef49..dcfc337 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -167,8 +167,6 @@ org.apache.qpid.systest.MessageCompressionTest#* org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091 -org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction - org.apache.qpid.server.queue.ArrivalTimeFilterTest#* org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#* org.apache.qpid.server.protocol.v0_8.* http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/Java010Excludes ---------------------------------------------------------------------- diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes index 4934dd3..d0448fa 100755 --- a/test-profiles/Java010Excludes +++ b/test-profiles/Java010Excludes @@ -55,8 +55,6 @@ org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091 -org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction - // QPID-6722: Race client side means that session close can end in exception when failover is in progress. org.apache.qpid.client.failover.FailoverBehaviourTest#testConnectionCloseInterruptsFailover org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testConnectionCloseInterruptsFailover http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/Java10UninvestigatedTestsExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/Java10UninvestigatedTestsExcludes b/test-profiles/Java10UninvestigatedTestsExcludes index 349fb71..4402248 100644 --- a/test-profiles/Java10UninvestigatedTestsExcludes +++ b/test-profiles/Java10UninvestigatedTestsExcludes @@ -20,9 +20,7 @@ // This file should eventually be removed as all the systests are moved to either // working, defined as broken, or excluded as they test version specific functionality -org.apache.qpid.client.prefetch.PrefetchBehaviourTest#* - -QPID-XXXX: It could be a broker bug. The issue requires further inevestigation +QPID-XXXX: It could be a broker bug. The issue requires further investigation org.apache.qpid.systest.AnonymousProducerTest#testPublishIntoNonExistingQueue org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicyMessageDepth http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/JavaPre010Excludes ---------------------------------------------------------------------- diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes index d1883a9..d59fab3 100644 --- a/test-profiles/JavaPre010Excludes +++ b/test-profiles/JavaPre010Excludes @@ -55,9 +55,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMe org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers -// QPID-3604 This fix is applied only to the 0-10 code, hence this test does not work for pre 0-10. -org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testConnectionStop - // QPID-3396 org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
