Repository: activemq
Updated Branches:
  refs/heads/trunk b1ede0559 -> 8cdb5c2c1


https://issues.apache.org/jira/browse/AMQ-5274 - we now only check expiry on 
non inflight messages so there is on contention on ack with the periodic expriy 
check thread - related https://issues.apache.org/jira/browse/AMQ-2876


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/26807cd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/26807cd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/26807cd4

Branch: refs/heads/trunk
Commit: 26807cd4524460e22844d16136f03bc96fa9b4c8
Parents: b1ede05
Author: gtully <gary.tu...@gmail.com>
Authored: Thu Sep 11 16:13:43 2014 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Thu Sep 11 16:13:43 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   2 +-
 .../broker/region/QueueSubscription.java        |   8 --
 ...JmsSendReceiveWithMessageExpirationTest.java |  21 ++-
 .../org/apache/activemq/bugs/AMQ5274Test.java   | 133 +++++++++++++++++++
 .../activemq/usecases/ExpiredMessagesTest.java  |   2 +-
 .../ExpiredMessagesWithNoConsumerTest.java      |  21 ++-
 6 files changed, 170 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e9f2180..ff16dfc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1169,7 +1169,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             List<MessageReference> toExpire) throws Exception {
         for (Iterator<? extends MessageReference> i = refs.iterator(); 
i.hasNext() && l.size() < max;) {
             QueueMessageReference ref = (QueueMessageReference) i.next();
-            if (ref.isExpired()) {
+            if (ref.isExpired() && (ref.getLockOwner() == null)) {
                 toExpire.add(ref);
             } else if (l.contains(ref.getMessage()) == false) {
                 l.add(ref.getMessage());

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 7c7027f..358f946 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -52,14 +52,6 @@ public class QueueSubscription extends PrefetchSubscription 
implements LockOwner
         final Destination q = (Destination) n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;
-
-        if (n.isExpired()) {
-            // sync with message expiry processing
-            if (!broker.isExpired(n)) {
-                LOG.debug("ignoring ack {}, for already expired message: {}", 
ack, n);
-                return;
-            }
-        }
         queue.removeMessage(context, this, node, ack);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
index 956fa40..391253e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
@@ -30,6 +30,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,7 +155,7 @@ public class JmsSendReceiveWithMessageExpirationTest 
extends TestSupport {
              received.acknowledge();
          };
 
-         assertEquals("got messages", messageCount + 1, messages.size());
+         assertEquals("got all (normal plus one with ttl) messages", 
messageCount + 1, messages.size());
 
          Vector<Message> dlqMessages = new Vector<Message>();
          while ((received = dlqConsumer.receive(1000)) != null) {
@@ -159,6 +163,21 @@ public class JmsSendReceiveWithMessageExpirationTest 
extends TestSupport {
          };
 
          assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+
+         final DestinationStatistics view = 
getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), 
ActiveMQDestination.transform(consumerDestination));
+
+         // wait for all to inflight to expire
+         assertTrue("all inflight messages expired ", Wait.waitFor(new 
Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 return view.getInflight().getCount() == 0;
+             }
+         }));
+         assertEquals("Wrong inFlightCount: ", 0, 
view.getInflight().getCount());
+
+         LOG.info("Stats: received: "  + messages.size() + ", messages: " + 
view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() 
+ ", dequeues: " + view.getDequeues().getCount()
+                 + ", dispatched: " + view.getDispatched().getCount() + ", 
inflight: " + view.getInflight().getCount() + ", expired: " + 
view.getExpired().getCount());
+
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
new file mode 100644
index 0000000..4ba6526
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ5274Test {
+    static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
+    String activemqURL;
+    BrokerService brokerService;
+    ActiveMQQueue dest = new ActiveMQQueue("TestQ");
+
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setExpireMessagesPeriod(1000);
+        policyMap.setDefaultEntry(defaultPolicy);
+        brokerService.setDestinationPolicy(policyMap);
+        activemqURL = 
brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+        brokerService.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+        LOG.info("Starting Test");
+        assertTrue(brokerService.isStarted());
+
+        produce();
+        consumeAndRollback();
+
+        // check reported queue size using JMX
+        long queueSize = getQueueSize();
+        assertEquals("Queue " + dest.getPhysicalName() + " not empty, 
reporting " + queueSize + " messages.", 0, queueSize);
+    }
+
+    private void consumeAndRollback() throws JMSException, 
InterruptedException {
+        ActiveMQConnection connection = createConnection();
+        RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
+        noRedelivery.setMaximumRedeliveries(0);
+        connection.setRedeliveryPolicy(noRedelivery);
+        connection.start();
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(dest);
+        Message m;
+        while ( (m = consumer.receive(4000)) != null) {
+            LOG.info("Got:" + m);
+            TimeUnit.SECONDS.sleep(1);
+            session.rollback();
+        }
+        connection.close();
+    }
+
+    private void produce() throws Exception {
+        Connection connection = createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(dest);
+        producer.setTimeToLive(10000);
+        for (int i=0;i<20;i++) {
+            producer.send(session.createTextMessage("i="+i));
+        }
+       connection.close();
+    }
+
+    private ActiveMQConnection createConnection() throws JMSException {
+        return (ActiveMQConnection) new 
ActiveMQConnectionFactory(activemqURL).createConnection();
+    }
+
+
+    public long getQueueSize() throws Exception {
+        long queueSize = 0;
+        try {
+            QueueViewMBean queueViewMBean = (QueueViewMBean) 
brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(),
 dest), QueueViewMBean.class, false);
+            queueSize = queueViewMBean.getQueueSize();
+            LOG.info("QueueSize for destination {} is {}", dest, queueSize);
+        } catch (Exception ex) {
+           LOG.error("Error retrieving QueueSize from JMX ", ex);
+           throw ex;
+        }
+        return queueSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 4c97972..0205599 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -190,7 +190,7 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
 
         // memory check
         assertEquals("memory usage is back to duck egg", 0, 
getDestination(broker, destination).getMemoryUsage().getPercentUsage());
-        assertTrue("memory usage is increased ", 0 < getDestination(broker, 
dlqDestination).getMemoryUsage().getPercentUsage());
+        assertTrue("memory usage is increased ", 0 < getDestination(broker, 
dlqDestination).getMemoryUsage().getUsage());
 
         // verify DLQ
         MessageConsumer dlqConsumer = createDlqConsumer(connection);

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
index ebdb5bc..e2ad7f6 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
@@ -257,7 +257,7 @@ public class ExpiredMessagesWithNoConsumerTest extends 
CombinationTestSupport {
     // first ack delivered after expiry
     public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
         createBroker();
-        final long queuePrefetch = 600;
+        final long queuePrefetch = 5;
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + 
queuePrefetch);
         connection = factory.createConnection();
@@ -266,7 +266,7 @@ public class ExpiredMessagesWithNoConsumerTest extends 
CombinationTestSupport {
         final int ttl = 4000;
         producer.setTimeToLive(ttl);
 
-        final long sendCount = 1500;
+        final long sendCount = 10;
         final CountDownLatch receivedOneCondition = new CountDownLatch(1);
         final CountDownLatch waitCondition = new CountDownLatch(1);
 
@@ -328,10 +328,14 @@ public class ExpiredMessagesWithNoConsumerTest extends 
CombinationTestSupport {
                 return queuePrefetch == view.getDispatchCount();
             }
         }));
-        assertTrue("Not all sent have expired ", Wait.waitFor(new 
Wait.Condition() {
+        assertTrue("all non inflight have expired ", Wait.waitFor(new 
Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return sendCount == view.getExpiredCount();
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + 
view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", 
expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
+
+                return view.getExpiredCount() > 0 && (view.getEnqueueCount() - 
view.getInFlightCount()) == view.getExpiredCount();
             }
         }));
 
@@ -448,10 +452,15 @@ public class ExpiredMessagesWithNoConsumerTest extends 
CombinationTestSupport {
                 return queuePrefetch == view.getDispatchCount();
             }
         }));
-        assertTrue("All have not sent have expired ", Wait.waitFor(new 
Wait.Condition() {
+
+        assertTrue("all non inflight have expired ", Wait.waitFor(new 
Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return sendCount == view.getExpiredCount();
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + 
view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", 
expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
+
+                return view.getExpiredCount() > 0 && (view.getEnqueueCount() - 
view.getInFlightCount()) == view.getExpiredCount();
             }
         }));
 

Reply via email to