Repository: activemq Updated Branches: refs/heads/trunk e3377edb0 -> 642cc4321
https://issues.apache.org/jira/browse/AMQ-5406 Add support for disable of consumer expiration checks. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/642cc432 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/642cc432 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/642cc432 Branch: refs/heads/trunk Commit: 642cc432160067a11ff36dcadd4aeb7c4cbbccdc Parents: e3377ed Author: Timothy Bish <[email protected]> Authored: Wed Oct 22 14:42:45 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Oct 22 14:42:45 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/ActiveMQConnection.java | 20 +++ .../activemq/ActiveMQConnectionFactory.java | 22 +++ .../activemq/ActiveMQMessageConsumer.java | 14 +- .../apache/activemq/JmsMessageConsumerTest.java | 170 +++++++++++++++++++ 4 files changed, 224 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 9252310..68c8344 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -159,6 +159,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean sendAcksAsync=true; private boolean checkForDuplicates = true; private boolean queueOnlyConnection = false; + private boolean consumerExpiryCheckEnabled = true; private final Transport transport; private final IdGenerator clientIdGenerator; @@ -2709,4 +2710,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; } + + /** + * @return true if MessageConsumer instance will check for expired messages before dispatch. + */ + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + /** + * Controls whether message expiration checking is done in each MessageConsumer + * prior to dispatching a message. Disabling this check can lead to consumption + * of expired messages. + * + * @param consumerExpiryCheckEnabled + * controls whether expiration checking is done prior to dispatch. + */ + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 35d4a69..3354ab3 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -34,6 +34,7 @@ import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.naming.Context; + import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; import org.apache.activemq.jndi.JNDIBaseStorable; @@ -180,6 +181,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private RejectedExecutionHandler rejectedTaskHandler = null; protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class private boolean rmIdFromConnectionId = false; + private boolean consumerExpiryCheckEnabled = true; // ///////////////////////////////////////////// // @@ -403,6 +405,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setRejectedTaskHandler(getRejectedTaskHandler()); connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); + connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -824,6 +827,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); + props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled())); } public boolean isUseCompression() { @@ -1222,4 +1226,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne this.rmIdFromConnectionId = rmIdFromConnectionId; } + /** + * @return true if MessageConsumer instance will check for expired messages before dispatch. + */ + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + /** + * Controls whether message expiration checking is done in each MessageConsumer + * prior to dispatching a message. Disabling this check can lead to consumption + * of expired messages. + * + * @param consumerExpiryCheckEnabled + * controls whether expiration checking is done prior to dispatch. + */ + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a60a7ac..02dbf49 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -158,6 +158,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private long failoverRedeliveryWaitPeriod = 0; private boolean transactedIndividualAck = false; private boolean nonBlockingRedelivery = false; + private boolean consumerExpiryCheckEnabled = true; /** * Create a MessageConsumer @@ -267,6 +268,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery; + this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled(); if (messageListener != null) { setMessageListener(messageListener); } @@ -488,7 +490,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } else if (md.getMessage() == null) { return null; - } else if (md.getMessage().isExpired()) { + } else if (isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) { if (LOG.isDebugEnabled()) { LOG.debug(getConsumerId() + " received expired message: " + md); } @@ -1385,7 +1387,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); try { - boolean expired = message.isExpired(); + boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if (!expired) { listener.onMessage(message); } @@ -1626,4 +1628,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC public boolean hasMessageListener() { return messageListener.get() != null; } + + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/642cc432/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java new file mode 100644 index 0000000..908de0d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class JmsMessageConsumerTest { + + private BrokerService brokerService; + private String brokerURI; + + @Rule public TestName name = new TestName(); + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + brokerURI = "vm://localhost?create=false"; + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testSyncReceiveWithExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + connection.start(); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + + assertNull(consumer.receive(1000)); + connection.close(); + } + + @Test + public void testSyncReceiveWithIgnoreExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + factory.setConsumerExpiryCheckEnabled(false); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + connection.start(); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + + assertNotNull(consumer.receive(1000)); + connection.close(); + } + + @Test + public void testAsyncReceiveWithExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + + final CountDownLatch received = new CountDownLatch(1); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.countDown(); + } + }); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + connection.start(); + + assertFalse(received.await(1, TimeUnit.SECONDS)); + connection.close(); + } + + @Test + public void testAsyncReceiveWithoutExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + factory.setConsumerExpiryCheckEnabled(false); + + final CountDownLatch received = new CountDownLatch(1); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.countDown(); + } + }); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + connection.start(); + + assertTrue(received.await(5, TimeUnit.SECONDS)); + connection.close(); + } +}
