Repository: qpid-jms Updated Branches: refs/heads/master 80f33fc6a -> 592969627
https://issues.apache.org/jira/browse/QPIDJMS-96 Added in expired message filtering with disable option and tests. Docs updated with new option. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/59296962 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/59296962 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/59296962 Branch: refs/heads/master Commit: 5929696277cf8f04b1cb85815d3d4596e356690d Parents: 80f33fc Author: Timothy Bish <[email protected]> Authored: Fri Aug 21 16:02:21 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Aug 21 16:02:21 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 9 + .../apache/qpid/jms/JmsConnectionFactory.java | 20 ++ .../org/apache/qpid/jms/JmsMessageConsumer.java | 51 +++- .../java/org/apache/qpid/jms/JmsSession.java | 9 + .../org/apache/qpid/jms/message/JmsMessage.java | 5 + .../apache/qpid/jms/meta/JmsConsumerInfo.java | 9 + .../apache/qpid/jms/meta/JmsSessionInfo.java | 9 + .../qpid/jms/provider/amqp/AmqpConsumer.java | 2 + .../jms/provider/failover/FailoverProvider.java | 4 +- .../jms/integration/MessageIntegrationTest.java | 2 +- qpid-jms-docs/Configuration.md | 2 + .../JmsExpiredMessageConsumptionTest.java | 266 +++++++++++++++++++ 12 files changed, 375 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index be334e3..39f97ab 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -101,6 +101,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti private boolean localMessagePriority; private boolean clientIdSet; private boolean sendAcksAsync; + private boolean consumerExpiryCheckEnabled; private ExceptionListener exceptionListener; private final ThreadPoolExecutor executor; @@ -978,6 +979,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti this.sendAcksAsync = sendAcksAsync; } + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } + //----- Async event handlers ---------------------------------------------// @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java index a71df04..abd5f93 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java @@ -60,6 +60,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact private boolean alwaysSyncSend; private boolean sendAcksAsync; private boolean localMessagePriority; + private boolean consumerExpiryCheckEnabled = true; private String queuePrefix = null; private String topicPrefix = null; private boolean validatePropertyNames = true; @@ -647,4 +648,23 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact public void setSendAcksAsync(boolean sendAcksAsync) { this.sendAcksAsync = sendAcksAsync; } + + /** + * @return true if MessageConsumer instance will check for expired messages before dispatch. + */ + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + /** + * Controls whether message expiration checking is done in each MessageConsumer + * prior to dispatching a message. Disabling this check can lead to consumption + * of expired messages. + * + * @param consumerExpiryCheckEnabled + * controls whether expiration checking is done prior to dispatch. + */ + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index e8a7066..b3ae7f3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -119,6 +119,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC consumerInfo.setBrowser(isBrowser()); consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy)); consumerInfo.setRedeliveryPolicy(redeliveryPolicy); + consumerInfo.setConsumerExpiryCheckEnabled(session.isConsumerExpiryCheckEnabled()); session.getConnection().createResource(consumerInfo); } @@ -296,6 +297,13 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } else if (envelope.getMessage() == null) { LOG.trace("{} no message was available for this consumer: {}", getConsumerId()); return null; + } else if (consumeExpiredMessage(envelope)) { + LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope); + doAckExpired(envelope); + if (timeout > 0) { + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } + sendPullCommand(timeout); } else if (redeliveryExceeded(envelope)) { LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), envelope); // TODO - Future @@ -317,6 +325,14 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } } + private boolean consumeExpiredMessage(JmsInboundMessageDispatch dispatch) { + if (!isBrowser() && consumerInfo.isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired()) { + return true; + } + + return false; + } + protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) { // TODO - Future // Check for message that have been redelivered to see if they exceed @@ -384,6 +400,15 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC return envelope; } + private void doAckExpired(final JmsInboundMessageDispatch envelope) throws JMSException { + try { + session.acknowledge(envelope, ACK_TYPE.EXPIRED); + } catch (JMSException ex) { + session.onException(ex); + throw ex; + } + } + private void doAckReleased(final JmsInboundMessageDispatch envelope) throws JMSException { try { session.acknowledge(envelope, ACK_TYPE.RELEASED); @@ -661,19 +686,25 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) { try { JmsMessage copy = null; - boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE || - acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; - if (autoAckOrDupsOk) { - copy = copy(doAckDelivered(envelope)); + + if (consumeExpiredMessage(envelope)) { + LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope); + doAckExpired(envelope); } else { - copy = copy(ackFromReceive(envelope)); - } - session.clearSessionRecovered(); + boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE || + acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; + if (autoAckOrDupsOk) { + copy = copy(doAckDelivered(envelope)); + } else { + copy = copy(ackFromReceive(envelope)); + } + session.clearSessionRecovered(); - messageListener.onMessage(copy); + messageListener.onMessage(copy); - if (autoAckOrDupsOk && !session.isSessionRecovered()) { - doAckConsumed(envelope); + if (autoAckOrDupsOk && !session.isSessionRecovered()) { + doAckConsumed(envelope); + } } } catch (Exception e) { // TODO - We need to handle exception of on message with some other http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 19d86e5..dea51ad 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -117,6 +117,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa this.sessionInfo = new JmsSessionInfo(sessionId); this.sessionInfo.setAcknowledgementMode(acknowledgementMode); this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync()); + this.sessionInfo.setConsumerExpiryCheckEnabled(connection.isConsumerExpiryCheckEnabled()); connection.createResource(sessionInfo); } @@ -805,6 +806,14 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; } + public boolean isConsumerExpiryCheckEnabled() { + return sessionInfo.isConsumerExpiryCheckEnabled(); + } + + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + sessionInfo.setConsumerExpiryCheckEnabled(consumerExpiryCheckEnabled); + } + protected void checkClosed() throws IllegalStateException { if (closed.get()) { IllegalStateException jmsEx = null; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java index 82f74b2..fda38c0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java @@ -522,6 +522,11 @@ public class JmsMessage implements javax.jms.Message { return this.facade; } + public boolean isExpired() { + long expireTime = facade.getExpiration(); + return expireTime > 0 && System.currentTimeMillis() > expireTime; + } + @Override public String toString() { return "JmsMessage { " + facade + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java index a7e7ad0..4504434 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java @@ -31,6 +31,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume protected String subscriptionName; protected boolean noLocal; protected int acknowledgementMode; + protected boolean consumerExpiryCheckEnabled; protected JmsRedeliveryPolicy redeliveryPolicy; @@ -153,6 +154,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume this.acknowledgementMode = acknowledgementMode; } + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } + public JmsRedeliveryPolicy getRedeliveryPolicy() { return redeliveryPolicy; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java index a88e5e7..b23eefa 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java @@ -25,6 +25,7 @@ public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionI private final JmsSessionId sessionId; private int acknowledgementMode; private boolean sendAcksAsync; + private boolean consumerExpiryCheckEnabled; public JmsSessionInfo(JmsConnectionInfo connectionInfo, long sessionId) { if (connectionInfo == null) { @@ -81,6 +82,14 @@ public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionI this.sendAcksAsync = sendAcksAsync; } + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } + @Override public String toString() { return ToStringSupport.toString(this); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 6160b8d..7710ec2 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -346,6 +346,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } else if (ackType.equals(ACK_TYPE.POISONED)) { deliveryFailed(delivery); + } else if (ackType.equals(ACK_TYPE.EXPIRED)) { + deliveryFailed(delivery); } else if (ackType.equals(ACK_TYPE.RELEASED)) { delivery.disposition(Released.getInstance()); delivery.settle(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 8fd44e6..56efdf1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -288,7 +288,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide final FailoverRequest pending = new FailoverRequest(request) { @Override public void doTask() throws IOException, JMSException, UnsupportedOperationException { - if(resourceId instanceof JmsConnectionInfo) { + if (resourceId instanceof JmsConnectionInfo) { closingConnection.set(true); } provider.destroy(resourceId, this); @@ -645,7 +645,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) { LOG.error("Failed to connect after: " + reconnectAttempts + " attempt(s)"); failed.set(true); - if(failure == null) { + if (failure == null) { failureCause = new IOException("Failed to connect after: " + reconnectAttempts + " attempt(s)"); } else { failureCause = IOExceptionSupport.create(failure); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java index ad7dd36..490358f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java @@ -1284,7 +1284,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); - long timestamp = System.currentTimeMillis(); + long timestamp = System.currentTimeMillis() + 5000; PropertiesDescribedType props = new PropertiesDescribedType(); props.setAbsoluteExpiryTime(new Date(timestamp)); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-docs/Configuration.md ---------------------------------------------------------------------- diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md index 92adc5e..ae5bb19 100644 --- a/qpid-jms-docs/Configuration.md +++ b/qpid-jms-docs/Configuration.md @@ -87,6 +87,8 @@ The options apply to the behaviour of the JMS objects such as Connection, Sessio + **jms.connectTimeout** Timeout value that controls how long the client waits on Connection establishment before returning with an error. (By default the client waits 15 seconds for a connection to be established before failing). + **jms.clientIDPrefix** Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'. + **jms.connectionIDPrefix** Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'. ++ **jms.consumerExpiryCheckEnabled** Controls whether a MessageConsumer instances will filter expired Messages +or deliver them. By default this value is set to true and expired messages will be filtered. These values control how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java new file mode 100644 index 0000000..8bea583 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class); + + @Override + protected void configureBrokerPolicies(BrokerService broker) { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(60000); + defaultEntry.setUseCache(false); + policyMap.setDefaultEntry(defaultEntry); + + broker.setDestinationPolicy(policyMap); + } + + @Test(timeout = 60000) + public void testConsumerExpiredMessageSync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + producer.setTimeToLive(500); + producer.send(session.createTextMessage("Message-1")); + + producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); + producer.send(session.createTextMessage("Message-2")); + + TimeUnit.MILLISECONDS.sleep(800); + + Message received = consumer.receive(5000); + assertNotNull(received); + TextMessage textMessage = (TextMessage) received; + assertTrue(textMessage.getText().endsWith("2")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + + final QueueViewMBean dlqProxy = getProxyToQueue("ActiveMQ.DLQ"); + assertTrue("Queued message did not get sent to DLQ.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return dlqProxy.getQueueSize() == 1; + } + })); + } + + @Test(timeout = 60000) + public void testConsumerExpiredMessageAsync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + final CountDownLatch success = new CountDownLatch(1); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + producer.setTimeToLive(500); + producer.send(session.createTextMessage("Message-1")); + + producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); + producer.send(session.createTextMessage("Message-2")); + + TimeUnit.MILLISECONDS.sleep(800); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message incoming) { + TextMessage textMessage = (TextMessage) incoming; + try { + if (textMessage.getText().endsWith("2")) { + success.countDown(); + } + } catch (JMSException e) { + } + } + }); + + assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS)); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testConsumerExpirationFilterDisabledSync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + jmsConnection.setConsumerExpiryCheckEnabled(false); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + producer.setTimeToLive(500); + producer.send(session.createTextMessage("Message-1")); + + TimeUnit.MILLISECONDS.sleep(800); + + Message received = consumer.receive(5000); + assertNotNull(received); + TextMessage textMessage = (TextMessage) received; + assertTrue(textMessage.getText().endsWith("1")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testConsumerExpirationFilterDisabledAsync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + jmsConnection.setConsumerExpiryCheckEnabled(false); + + final CountDownLatch success = new CountDownLatch(1); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + producer.setTimeToLive(500); + producer.send(session.createTextMessage("Message-1")); + + TimeUnit.MILLISECONDS.sleep(800); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message incoming) { + TextMessage textMessage = (TextMessage) incoming; + try { + if (textMessage.getText().endsWith("1")) { + success.countDown(); + } + } catch (JMSException e) { + } + } + }); + + assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS)); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout=20000) + public void testConsumerReceivePrefetchZeroMessageExpiredInFlight() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + jmsConnection.getPrefetchPolicy().setAll(0); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + TextMessage expiredMessage = session.createTextMessage("expired message"); + TextMessage validMessage = session.createTextMessage("valid message"); + producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 50); + producer.send(validMessage); + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(3000); + assertNotNull(message); + TextMessage received = (TextMessage) message; + assertEquals("expired message", received.getText()); + + // Rollback allow the first message to expire. + session.rollback(); + Thread.sleep(75); + + // Consume again, this should fetch the second valid message via a pull. + message = consumer.receive(3000); + assertNotNull(message); + received = (TextMessage) message; + assertEquals("valid message", received.getText()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
