Repository: qpid-broker-j Updated Branches: refs/heads/master 910f439bd -> 377315ba5
QPID-6933: [System Tests] Move TimeToLiveTest into JMS 1.1 system tests Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/377315ba Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/377315ba Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/377315ba Branch: refs/heads/master Commit: 377315ba5d272d0c4125b6f8deb88b8e1de4c38e Parents: 910f439 Author: Alex Rudyy <[email protected]> Authored: Fri Dec 8 11:28:08 2017 +0000 Committer: Alex Rudyy <[email protected]> Committed: Fri Dec 8 11:28:08 2017 +0000 ---------------------------------------------------------------------- .../org/apache/qpid/systests/JmsTestBase.java | 6 + .../jms_1_1/message/TimeToLiveTest.java | 182 +++++++++ .../jms_1_1/topic/DurableSubscribtionTest.java | 4 +- .../jms_1_1/topic/TopicSessionTest.java | 7 - .../jms_1_1/topic/TopicSubscriberTest.java | 2 +- .../qpid/server/queue/TimeToLiveTest.java | 399 ------------------- test-profiles/CPPExcludes | 4 - 7 files changed, 191 insertions(+), 413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java index 9be3f1c..806d35b 100644 --- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java +++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java @@ -33,6 +33,7 @@ import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; +import javax.jms.TopicConnection; import javax.naming.NamingException; import org.junit.BeforeClass; @@ -194,4 +195,9 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase return (Map<String, Object>) statistics; } + + protected TopicConnection getTopicConnection() throws JMSException, NamingException + { + return (TopicConnection) getConnection(); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java new file mode 100644 index 0000000..db6b01f --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systests.jms_1_1.message; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSubscriber; + +import org.junit.Test; + +import org.apache.qpid.systests.JmsTestBase; + +public class TimeToLiveTest extends JmsTestBase +{ + @Test + public void testPassiveTTL() throws Exception + { + Queue queue = createQueue(getTestName()); + Connection connection = getConnection(); + long timeToLiveMillis = getReceiveTimeout(); + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(queue); + producer.setTimeToLive(timeToLiveMillis); + producer.send(session.createTextMessage("A")); + producer.setTimeToLive(0); + producer.send(session.createTextMessage("B")); + session.commit(); + + Thread.sleep(timeToLiveMillis); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message message = consumer.receive(getReceiveTimeout()); + + assertTrue("TextMessage should be received", message instanceof TextMessage); + assertEquals("Unexpected message received", "B", ((TextMessage)message).getText()); + } + finally + { + connection.close(); + } + } + + @Test + public void testActiveTTL() throws Exception + { + Queue queue = createQueue(getTestName()); + Connection connection = getConnection(); + long timeToLiveMillis = getReceiveTimeout() * 2; + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(queue); + producer.setTimeToLive(timeToLiveMillis); + producer.send(session.createTextMessage("A")); + producer.setTimeToLive(0); + producer.send(session.createTextMessage("B")); + session.commit(); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message message = consumer.receive(getReceiveTimeout()); + + assertTrue("TextMessage should be received", message instanceof TextMessage); + assertEquals("Unexpected message received", "A", ((TextMessage) message).getText()); + + Thread.sleep(timeToLiveMillis); + + session.rollback(); + message = consumer.receive(getReceiveTimeout()); + + assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage); + assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText()); + } + finally + { + connection.close(); + } + } + + @Test + public void testPassiveTTLWithDurableSubscription() throws Exception + { + long timeToLiveMillis = getReceiveTimeout() * 2; + String subscriptionName = getTestName() + "_sub"; + Topic topic = createTopic(getTestName()); + TopicConnection connection = getTopicConnection(); + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName); + MessageProducer producer = session.createProducer(topic); + producer.setTimeToLive(timeToLiveMillis); + producer.send(session.createTextMessage("A")); + producer.setTimeToLive(0); + producer.send(session.createTextMessage("B")); + session.commit(); + + connection.start(); + Message message = durableSubscriber.receive(getReceiveTimeout()); + + assertTrue("TextMessage should be received", message instanceof TextMessage); + assertEquals("Unexpected message received", "A", ((TextMessage)message).getText()); + + Thread.sleep(timeToLiveMillis); + + session.rollback(); + message = durableSubscriber.receive(getReceiveTimeout()); + + assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage); + assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText()); + } + finally + { + connection.close(); + } + } + + @Test + public void testActiveTTLWithDurableSubscription() throws Exception + { + long timeToLiveMillis = getReceiveTimeout(); + String subscriptionName = getTestName() + "_sub"; + Topic topic = createTopic(getTestName()); + TopicConnection connection = getTopicConnection(); + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName); + MessageProducer producer = session.createProducer(topic); + producer.setTimeToLive(timeToLiveMillis); + producer.send(session.createTextMessage("A")); + producer.setTimeToLive(0); + producer.send(session.createTextMessage("B")); + session.commit(); + + Thread.sleep(timeToLiveMillis); + + connection.start(); + Message message = durableSubscriber.receive(getReceiveTimeout()); + + assertTrue("TextMessage should be received", message instanceof TextMessage); + assertEquals("Unexpected message received", "B", ((TextMessage)message).getText()); + } + finally + { + connection.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java index 249dc18..c759207 100644 --- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java @@ -619,7 +619,7 @@ public class DurableSubscribtionTest extends JmsTestBase getBrokerAdmin().restart(); - TopicConnection publisherConnection = (TopicConnection) getConnection(); + TopicConnection publisherConnection = getTopicConnection(); try { TopicSession session = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); @@ -740,7 +740,7 @@ public class DurableSubscribtionTest extends JmsTestBase } //send messages matching and not matching the original used selector - TopicConnection publisherConnection = (TopicConnection) getConnection(); + TopicConnection publisherConnection = getTopicConnection(); try { TopicSession session = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java index 3ea05d7..f31533c 100644 --- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java @@ -22,14 +22,12 @@ package org.apache.qpid.systests.jms_1_1.topic; import static org.junit.Assert.fail; -import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; -import javax.naming.NamingException; import org.junit.Test; @@ -124,9 +122,4 @@ public class TopicSessionTest extends JmsTestBase } } - private TopicConnection getTopicConnection() throws JMSException, NamingException - { - return (TopicConnection)getConnection(); - } - } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java index 18d40e8..bfa19f9 100644 --- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java @@ -47,7 +47,7 @@ public class TopicSubscriberTest extends JmsTestBase public void messageDeliveredToAllSubscribers() throws Exception { Topic topic = createTopic(getTestName()); - final TopicConnection connection = (TopicConnection) getConnection(); + final TopicConnection connection = getTopicConnection(); try { final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java deleted file mode 100644 index 918bdde..0000000 --- a/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSubscriber; -import javax.naming.NamingException; - -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class TimeToLiveTest extends QpidBrokerTestCase -{ - private static final Logger LOGGER = LoggerFactory.getLogger(TimeToLiveTest.class); - - protected final String QUEUE = "TimeToLiveQueue"; - - private final long TIME_TO_LIVE = 100L; - - private static final int MSG_COUNT = 50; - private static final long SERVER_TTL_TIMEOUT = 60000L; - private long _shortReceiveTimeout; - private long _longReceiveTimeout; - - @Override - public void setUp() throws Exception - { - super.setUp(); - _longReceiveTimeout = getLongReceiveTimeout(); - _shortReceiveTimeout = getShortReceiveTimeout(); - } - - public void testPassiveTTLWithPrefetch() throws Exception - { - doTestPassiveTTL(true); - } - - public void testPassiveTTL() throws Exception - { - doTestPassiveTTL(false); - - } - - private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException - { - //Create Client 1 - Connection clientConnection = getConnection(); - - Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = createTestQueue(clientSession, QUEUE); - - - //Create Producer - Connection producerConnection = getConnection(); - - producerConnection.start(); - - // Move to a Transacted session to ensure that all messages have been delivered to broker before - // we start waiting for TTL - Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer producer = producerSession.createProducer(queue); - - MessageConsumer consumer = clientSession.createConsumer(queue); - if(prefetchMessages) - { - clientConnection.start(); - } - - //Set TTL - int msg = 0; - producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); - - producer.setTimeToLive(TIME_TO_LIVE); - - for (; msg < MSG_COUNT - 2; msg++) - { - producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); - } - - //Reset TTL - producer.setTimeToLive(0L); - producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); - - producerSession.commit(); - - - // Ensure we sleep the required amount of time. - ReentrantLock waitLock = new ReentrantLock(); - Condition wait = waitLock.newCondition(); - final long MILLIS = 1000000L; - - long waitTime = TIME_TO_LIVE * MILLIS; - while (waitTime > 0) - { - try - { - waitLock.lock(); - - waitTime = wait.awaitNanos(waitTime); - } - catch (InterruptedException e) - { - //Stop if we are interrupted - fail(e.getMessage()); - } - finally - { - waitLock.unlock(); - } - - } - - if(prefetchMessages) - { - clientConnection.close(); - clientConnection = getConnection(); - - clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - queue = clientSession.createQueue(QUEUE); - consumer = clientSession.createConsumer(queue); - } - - clientConnection.start(); - - Message receivedFirst = consumer.receive(_longReceiveTimeout); - Message receivedSecond = consumer.receive(_longReceiveTimeout); - Message receivedThird = consumer.receive(_shortReceiveTimeout); - - // Log the messages to help diagnosis incase of failure - LOGGER.info("First:"+receivedFirst); - LOGGER.info("Second:"+receivedSecond); - LOGGER.info("Third:"+receivedThird); - - // Only first and last messages sent should survive expiry - Assert.assertNull("More messages received", receivedThird); - - Assert.assertNotNull("First message not received", receivedFirst); - Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); - Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); - - Assert.assertNotNull("Final message not received", receivedSecond); - Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); - Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); - - clientConnection.close(); - - producerConnection.close(); - } - - private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException - { - Message send = producerSession.createTextMessage("Message " + msg); - send.setBooleanProperty("first", first); - send.setStringProperty("testprop", "TimeToLiveTest"); - send.setLongProperty("TTL", producer.getTimeToLive()); - return send; - } - - - /** - * Tests the expired messages get actively deleted even on queues which have no consumers - * @throws Exception - */ - public void testActiveTTL() throws Exception - { - Connection producerConnection = getConnection(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = createTestQueue(producerSession); - - MessageProducer producer = producerSession.createProducer(queue); - producer.setTimeToLive(1000L); - - // send Messages - for(int i = 0; i < MSG_COUNT; i++) - { - producer.send(producerSession.createTextMessage("Message: "+i)); - } - long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; - - // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. - long messageCount = MSG_COUNT; - long lastPass; - - do - { - lastPass = messageCount; - Thread.sleep(100); - producerConnection.start(); - messageCount = getQueueDepth(producerConnection, queue); - - // If we have received messages in the last loop then extend the timeout time. - // if we get messages stuck that are not expiring then the failureTime will occur - // failing the test. This will help with the scenario when the broker does not - // have enough CPU cycles to process the TTLs. - if (lastPass != messageCount) - { - failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; - } - } - while(messageCount > 0L && System.currentTimeMillis() < failureTime); - - assertEquals("Messages not automatically expired: ", 0L, messageCount); - - producer.close(); - producerSession.close(); - producerConnection.close(); - } - - public void testPassiveTTLwithDurableSubscription() throws Exception - { - //Create Client 1 - TopicConnection clientConnection = (TopicConnection) getConnection(); - - Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Create and close the durable subscriber - Topic topic = createTopic(clientConnection, getTestQueueName()); - TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); - durableSubscriber.close(); - - //Create Producer - Connection producerConnection = getConnection(); - - producerConnection.start(); - - // Move to a Transacted session to ensure that all messages have been delivered to broker before - // we start waiting for TTL - Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer producer = producerSession.createProducer(topic); - - //Set TTL - int msg = 0; - producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); - - producer.setTimeToLive(TIME_TO_LIVE); - - for (; msg < MSG_COUNT - 2; msg++) - { - producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); - } - - //Reset TTL - producer.setTimeToLive(0L); - producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); - - producerSession.commit(); - - //resubscribe - durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); - - // Ensure we sleep the required amount of time. - ReentrantLock waitLock = new ReentrantLock(); - Condition wait = waitLock.newCondition(); - final long MILLIS = 1000000L; - - long waitTime = TIME_TO_LIVE * MILLIS; - while (waitTime > 0) - { - try - { - waitLock.lock(); - - waitTime = wait.awaitNanos(waitTime); - } - catch (InterruptedException e) - { - //Stop if we are interrupted - fail(e.getMessage()); - } - finally - { - waitLock.unlock(); - } - - } - - clientConnection.start(); - - Message receivedFirst = durableSubscriber.receive(_longReceiveTimeout); - Message receivedSecond = durableSubscriber.receive(_longReceiveTimeout); - Message receivedThird = durableSubscriber.receive(getShortReceiveTimeout()); - - // Log the messages to help diagnosis incase of failure - LOGGER.info("First:"+receivedFirst); - LOGGER.info("Second:"+receivedSecond); - LOGGER.info("Third:"+receivedThird); - - // Only first and last messages sent should survive expiry - Assert.assertNull("More messages received", receivedThird); - - Assert.assertNotNull("First message not received", receivedFirst); - Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); - Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); - - Assert.assertNotNull("Final message not received", receivedSecond); - Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); - Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); - - durableSubscriber.close(); - clientSession.unsubscribe(getTestQueueName()); - clientConnection.close(); - - producerConnection.close(); - } - - public void testActiveTTLwithDurableSubscription() throws Exception - { - //Create Client 1 - TopicConnection clientConnection = (TopicConnection) getConnectionBuilder().setClientId("clientid").build(); - Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Create and close the durable subscriber - Topic topic = createTopic(clientConnection, getTestQueueName()); - TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false); - durableSubscriber.close(); - - //Create Producer - Connection producerConnection = getConnection(); - producerConnection.start(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(topic); - producer.setTimeToLive(1000L); - - // send Messages - for(int i = 0; i < MSG_COUNT; i++) - { - producer.send(producerSession.createTextMessage("Message: "+i)); - } - long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; - - // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. - long messageCount = MSG_COUNT; - long lastPass; - Queue subcriptionQueue = isBroker10() ? producerSession.createQueue("qpidsub_/clientid_/MyDurableTTLSubscription_/durable") : new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription"); - do - { - lastPass = messageCount; - Thread.sleep(100); - messageCount = getQueueDepth(producerConnection, subcriptionQueue); - - // If we have received messages in the last loop then extend the timeout time. - // if we get messages stuck that are not expiring then the failureTime will occur - // failing the test. This will help with the scenario when the broker does not - // have enough CPU cycles to process the TTLs. - if (lastPass != messageCount) - { - failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; - } - } - while(messageCount > 0L && System.currentTimeMillis() < failureTime); - - assertEquals("Messages not automatically expired: ", 0L, messageCount); - - producer.close(); - producerSession.close(); - producerConnection.close(); - - clientSession.unsubscribe("MyDurableTTLSubscription"); - clientSession.close(); - clientConnection.close(); - } - -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index 38b9091..ca9943a 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -60,10 +60,6 @@ org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#* org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionListenerConnectionStopDeadlock -// c++ broker expires messages on delivery or when the queue cleaner thread runs. -org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL -org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTLwithDurableSubscription - // QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#* --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
