Repository: qpid-jms
Updated Branches:
  refs/heads/master 80f33fc6a -> 592969627


https://issues.apache.org/jira/browse/QPIDJMS-96

Added in expired message filtering with disable option and tests.  Docs
updated with new option. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/59296962
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/59296962
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/59296962

Branch: refs/heads/master
Commit: 5929696277cf8f04b1cb85815d3d4596e356690d
Parents: 80f33fc
Author: Timothy Bish <[email protected]>
Authored: Fri Aug 21 16:02:21 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Fri Aug 21 16:02:21 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   9 +
 .../apache/qpid/jms/JmsConnectionFactory.java   |  20 ++
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  51 +++-
 .../java/org/apache/qpid/jms/JmsSession.java    |   9 +
 .../org/apache/qpid/jms/message/JmsMessage.java |   5 +
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |   9 +
 .../apache/qpid/jms/meta/JmsSessionInfo.java    |   9 +
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |   2 +
 .../jms/provider/failover/FailoverProvider.java |   4 +-
 .../jms/integration/MessageIntegrationTest.java |   2 +-
 qpid-jms-docs/Configuration.md                  |   2 +
 .../JmsExpiredMessageConsumptionTest.java       | 266 +++++++++++++++++++
 12 files changed, 375 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index be334e3..39f97ab 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -101,6 +101,7 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     private boolean localMessagePriority;
     private boolean clientIdSet;
     private boolean sendAcksAsync;
+    private boolean consumerExpiryCheckEnabled;
     private ExceptionListener exceptionListener;
 
     private final ThreadPoolExecutor executor;
@@ -978,6 +979,14 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
         this.sendAcksAsync = sendAcksAsync;
     }
 
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    public void setConsumerExpiryCheckEnabled(boolean 
consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
+
     //----- Async event handlers 
---------------------------------------------//
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index a71df04..abd5f93 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -60,6 +60,7 @@ public class JmsConnectionFactory extends JNDIStorable 
implements ConnectionFact
     private boolean alwaysSyncSend;
     private boolean sendAcksAsync;
     private boolean localMessagePriority;
+    private boolean consumerExpiryCheckEnabled = true;
     private String queuePrefix = null;
     private String topicPrefix = null;
     private boolean validatePropertyNames = true;
@@ -647,4 +648,23 @@ public class JmsConnectionFactory extends JNDIStorable 
implements ConnectionFact
     public void setSendAcksAsync(boolean sendAcksAsync) {
         this.sendAcksAsync = sendAcksAsync;
     }
+
+    /**
+     * @return true if MessageConsumer instance will check for expired 
messages before dispatch.
+     */
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    /**
+     * Controls whether message expiration checking is done in each 
MessageConsumer
+     * prior to dispatching a message.  Disabling this check can lead to 
consumption
+     * of expired messages.
+     *
+     * @param consumerExpiryCheckEnabled
+     *        controls whether expiration checking is done prior to dispatch.
+     */
+    public void setConsumerExpiryCheckEnabled(boolean 
consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index e8a7066..b3ae7f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -119,6 +119,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
         consumerInfo.setBrowser(isBrowser());
         consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, 
policy));
         consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
+        
consumerInfo.setConsumerExpiryCheckEnabled(session.isConsumerExpiryCheckEnabled());
 
         session.getConnection().createResource(consumerInfo);
     }
@@ -296,6 +297,13 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
                 } else if (envelope.getMessage() == null) {
                     LOG.trace("{} no message was available for this consumer: 
{}", getConsumerId());
                     return null;
+                } else if (consumeExpiredMessage(envelope)) {
+                    LOG.trace("{} filtered expired message: {}", 
getConsumerId(), envelope);
+                    doAckExpired(envelope);
+                    if (timeout > 0) {
+                        timeout = Math.max(deadline - 
System.currentTimeMillis(), 0);
+                    }
+                    sendPullCommand(timeout);
                 } else if (redeliveryExceeded(envelope)) {
                     LOG.debug("{} received with excessive redelivered: {}", 
getConsumerId(), envelope);
                     // TODO - Future
@@ -317,6 +325,14 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
         }
     }
 
+    private boolean consumeExpiredMessage(JmsInboundMessageDispatch dispatch) {
+        if (!isBrowser() && consumerInfo.isConsumerExpiryCheckEnabled() && 
dispatch.getMessage().isExpired()) {
+            return true;
+        }
+
+        return false;
+    }
+
     protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
         // TODO - Future
         // Check for message that have been redelivered to see if they exceed
@@ -384,6 +400,15 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
         return envelope;
     }
 
+    private void doAckExpired(final JmsInboundMessageDispatch envelope) throws 
JMSException {
+        try {
+            session.acknowledge(envelope, ACK_TYPE.EXPIRED);
+        } catch (JMSException ex) {
+            session.onException(ex);
+            throw ex;
+        }
+    }
+
     private void doAckReleased(final JmsInboundMessageDispatch envelope) 
throws JMSException {
         try {
             session.acknowledge(envelope, ACK_TYPE.RELEASED);
@@ -661,19 +686,25 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
             while (session.isStarted() && (envelope = 
messageQueue.dequeueNoWait()) != null) {
                 try {
                     JmsMessage copy = null;
-                    boolean autoAckOrDupsOk = acknowledgementMode == 
Session.AUTO_ACKNOWLEDGE ||
-                                              acknowledgementMode == 
Session.DUPS_OK_ACKNOWLEDGE;
-                    if (autoAckOrDupsOk) {
-                        copy = copy(doAckDelivered(envelope));
+
+                    if (consumeExpiredMessage(envelope)) {
+                        LOG.trace("{} filtered expired message: {}", 
getConsumerId(), envelope);
+                        doAckExpired(envelope);
                     } else {
-                        copy = copy(ackFromReceive(envelope));
-                    }
-                    session.clearSessionRecovered();
+                        boolean autoAckOrDupsOk = acknowledgementMode == 
Session.AUTO_ACKNOWLEDGE ||
+                                                  acknowledgementMode == 
Session.DUPS_OK_ACKNOWLEDGE;
+                        if (autoAckOrDupsOk) {
+                            copy = copy(doAckDelivered(envelope));
+                        } else {
+                            copy = copy(ackFromReceive(envelope));
+                        }
+                        session.clearSessionRecovered();
 
-                    messageListener.onMessage(copy);
+                        messageListener.onMessage(copy);
 
-                    if (autoAckOrDupsOk && !session.isSessionRecovered()) {
-                        doAckConsumed(envelope);
+                        if (autoAckOrDupsOk && !session.isSessionRecovered()) {
+                            doAckConsumed(envelope);
+                        }
                     }
                 } catch (Exception e) {
                     // TODO - We need to handle exception of on message with 
some other

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 19d86e5..dea51ad 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -117,6 +117,7 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
         this.sessionInfo = new JmsSessionInfo(sessionId);
         this.sessionInfo.setAcknowledgementMode(acknowledgementMode);
         this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync());
+        
this.sessionInfo.setConsumerExpiryCheckEnabled(connection.isConsumerExpiryCheckEnabled());
 
         connection.createResource(sessionInfo);
     }
@@ -805,6 +806,14 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
         return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
     }
 
+    public boolean isConsumerExpiryCheckEnabled() {
+        return sessionInfo.isConsumerExpiryCheckEnabled();
+    }
+
+    public void setConsumerExpiryCheckEnabled(boolean 
consumerExpiryCheckEnabled) {
+        sessionInfo.setConsumerExpiryCheckEnabled(consumerExpiryCheckEnabled);
+    }
+
     protected void checkClosed() throws IllegalStateException {
         if (closed.get()) {
             IllegalStateException jmsEx = null;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index 82f74b2..fda38c0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -522,6 +522,11 @@ public class JmsMessage implements javax.jms.Message {
         return this.facade;
     }
 
+    public boolean isExpired() {
+        long expireTime = facade.getExpiration();
+        return expireTime > 0 && System.currentTimeMillis() > expireTime;
+    }
+
     @Override
     public String toString() {
         return "JmsMessage { " + facade + " }";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index a7e7ad0..4504434 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -31,6 +31,7 @@ public final class JmsConsumerInfo implements JmsResource, 
Comparable<JmsConsume
     protected String subscriptionName;
     protected boolean noLocal;
     protected int acknowledgementMode;
+    protected boolean consumerExpiryCheckEnabled;
 
     protected JmsRedeliveryPolicy redeliveryPolicy;
 
@@ -153,6 +154,14 @@ public final class JmsConsumerInfo implements JmsResource, 
Comparable<JmsConsume
         this.acknowledgementMode = acknowledgementMode;
     }
 
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    public void setConsumerExpiryCheckEnabled(boolean 
consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
+
     public JmsRedeliveryPolicy getRedeliveryPolicy() {
         return redeliveryPolicy;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
index a88e5e7..b23eefa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
@@ -25,6 +25,7 @@ public final class JmsSessionInfo implements JmsResource, 
Comparable<JmsSessionI
     private final JmsSessionId sessionId;
     private int acknowledgementMode;
     private boolean sendAcksAsync;
+    private boolean consumerExpiryCheckEnabled;
 
     public JmsSessionInfo(JmsConnectionInfo connectionInfo, long sessionId) {
         if (connectionInfo == null) {
@@ -81,6 +82,14 @@ public final class JmsSessionInfo implements JmsResource, 
Comparable<JmsSessionI
         this.sendAcksAsync = sendAcksAsync;
     }
 
+    public boolean isConsumerExpiryCheckEnabled() {
+        return consumerExpiryCheckEnabled;
+    }
+
+    public void setConsumerExpiryCheckEnabled(boolean 
consumerExpiryCheckEnabled) {
+        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+    }
+
     @Override
     public String toString() {
         return ToStringSupport.toString(this);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 6160b8d..7710ec2 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -346,6 +346,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
         } else if (ackType.equals(ACK_TYPE.POISONED)) {
             deliveryFailed(delivery);
+        } else if (ackType.equals(ACK_TYPE.EXPIRED)) {
+            deliveryFailed(delivery);
         } else if (ackType.equals(ACK_TYPE.RELEASED)) {
             delivery.disposition(Released.getInstance());
             delivery.settle();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 8fd44e6..56efdf1 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -288,7 +288,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
         final FailoverRequest pending = new FailoverRequest(request) {
             @Override
             public void doTask() throws IOException, JMSException, 
UnsupportedOperationException {
-                if(resourceId instanceof JmsConnectionInfo) {
+                if (resourceId instanceof JmsConnectionInfo) {
                    closingConnection.set(true);
                 }
                 provider.destroy(resourceId, this);
@@ -645,7 +645,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                 if (reconnectLimit != UNLIMITED && reconnectAttempts >= 
reconnectLimit) {
                     LOG.error("Failed to connect after: " + reconnectAttempts 
+ " attempt(s)");
                     failed.set(true);
-                    if(failure == null) {
+                    if (failure == null) {
                         failureCause = new IOException("Failed to connect 
after: " + reconnectAttempts + " attempt(s)");
                     } else {
                         failureCause = IOExceptionSupport.create(failure);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index ad7dd36..490358f 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -1284,7 +1284,7 @@ public class MessageIntegrationTest extends 
QpidJmsTestCase
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             Queue queue = session.createQueue("myQueue");
 
-            long timestamp = System.currentTimeMillis();
+            long timestamp = System.currentTimeMillis() + 5000;
 
             PropertiesDescribedType props = new PropertiesDescribedType();
             props.setAbsoluteExpiryTime(new Date(timestamp));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-docs/Configuration.md
----------------------------------------------------------------------
diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md
index 92adc5e..ae5bb19 100644
--- a/qpid-jms-docs/Configuration.md
+++ b/qpid-jms-docs/Configuration.md
@@ -87,6 +87,8 @@ The options apply to the behaviour of the JMS objects such as 
Connection, Sessio
 + **jms.connectTimeout** Timeout value that controls how long the client waits 
on Connection establishment before returning with an error. (By default the 
client waits 15 seconds for a connection to be established before failing).
 + **jms.clientIDPrefix** Optional prefix value that is used for generated 
Client ID values when a new Connection is created for the JMS 
ConnectionFactory.  The default prefix is 'ID:'.
 + **jms.connectionIDPrefix** Optional prefix value that is used for generated 
Connection ID values when a new Connection is created for the JMS 
ConnectionFactory.  This connection ID is used when logging some information 
from the JMS Connection object so a configurable prefix can make breadcrumbing 
the logs easier.  The default prefix is 'ID:'.
++ **jms.consumerExpiryCheckEnabled** Controls whether a MessageConsumer 
instances will filter expired Messages
+or deliver them.  By default this value is set to true and expired messages 
will be filtered.
 
 These values control how many messages the remote peer can send to the client 
and be held in a prefetch buffer for each consumer instance.
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/59296962/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
new file mode 100644
index 0000000..8bea583
--- /dev/null
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JmsMessageConsumerTest.class);
+
+    @Override
+    protected void configureBrokerPolicies(BrokerService broker) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(60000);
+        defaultEntry.setUseCache(false);
+        policyMap.setDefaultEntry(defaultEntry);
+
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    @Test(timeout = 60000)
+    public void testConsumerExpiredMessageSync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        producer.setTimeToLive(500);
+        producer.send(session.createTextMessage("Message-1"));
+
+        producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
+        producer.send(session.createTextMessage("Message-2"));
+
+        TimeUnit.MILLISECONDS.sleep(800);
+
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
+        TextMessage textMessage = (TextMessage) received;
+        assertTrue(textMessage.getText().endsWith("2"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertTrue("Queued message not consumed.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+
+        final QueueViewMBean dlqProxy = getProxyToQueue("ActiveMQ.DLQ");
+        assertTrue("Queued message did not get sent to DLQ.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return dlqProxy.getQueueSize() == 1;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testConsumerExpiredMessageAsync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        final CountDownLatch success = new CountDownLatch(1);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        producer.setTimeToLive(500);
+        producer.send(session.createTextMessage("Message-1"));
+
+        producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
+        producer.send(session.createTextMessage("Message-2"));
+
+        TimeUnit.MILLISECONDS.sleep(800);
+
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message incoming) {
+                TextMessage textMessage = (TextMessage) incoming;
+                try {
+                    if (textMessage.getText().endsWith("2")) {
+                        success.countDown();
+                    }
+                } catch (JMSException e) {
+                }
+            }
+        });
+
+        assertTrue("didn't get expected message", success.await(5, 
TimeUnit.SECONDS));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertTrue("Queued message not consumed.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testConsumerExpirationFilterDisabledSync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        JmsConnection jmsConnection = (JmsConnection) connection;
+        jmsConnection.setConsumerExpiryCheckEnabled(false);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        producer.setTimeToLive(500);
+        producer.send(session.createTextMessage("Message-1"));
+
+        TimeUnit.MILLISECONDS.sleep(800);
+
+        Message received = consumer.receive(5000);
+        assertNotNull(received);
+        TextMessage textMessage = (TextMessage) received;
+        assertTrue(textMessage.getText().endsWith("1"));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertTrue("Queued message not consumed.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testConsumerExpirationFilterDisabledAsync() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        JmsConnection jmsConnection = (JmsConnection) connection;
+        jmsConnection.setConsumerExpiryCheckEnabled(false);
+
+        final CountDownLatch success = new CountDownLatch(1);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
+
+        producer.setTimeToLive(500);
+        producer.send(session.createTextMessage("Message-1"));
+
+        TimeUnit.MILLISECONDS.sleep(800);
+
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message incoming) {
+                TextMessage textMessage = (TextMessage) incoming;
+                try {
+                    if (textMessage.getText().endsWith("1")) {
+                        success.countDown();
+                    }
+                } catch (JMSException e) {
+                }
+            }
+        });
+
+        assertTrue("didn't get expected message", success.await(5, 
TimeUnit.SECONDS));
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertTrue("Queued message not consumed.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout=20000)
+    public void testConsumerReceivePrefetchZeroMessageExpiredInFlight() throws 
Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        JmsConnection jmsConnection = (JmsConnection) connection;
+        jmsConnection.getPrefetchPolicy().setAll(0);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage expiredMessage = session.createTextMessage("expired 
message");
+        TextMessage validMessage = session.createTextMessage("valid message");
+        producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, 
Message.DEFAULT_PRIORITY, 50);
+        producer.send(validMessage);
+        session.close();
+
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message = consumer.receive(3000);
+        assertNotNull(message);
+        TextMessage received = (TextMessage) message;
+        assertEquals("expired message", received.getText());
+
+        // Rollback allow the first message to expire.
+        session.rollback();
+        Thread.sleep(75);
+
+        // Consume again, this should fetch the second valid message via a 
pull.
+        message = consumer.receive(3000);
+        assertNotNull(message);
+        received = (TextMessage) message;
+        assertEquals("valid message", received.getText());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to