Repository: activemq
Updated Branches:
  refs/heads/master 004be5612 -> 57a6f2373


AMQ-6854 - ensure non null return in the event on no space to page in


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/57a6f237
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/57a6f237
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/57a6f237

Branch: refs/heads/master
Commit: 57a6f2373eb7609ce4ab7cadcd7d38889414a24c
Parents: 004be56
Author: gtully <[email protected]>
Authored: Wed Jun 13 13:10:52 2018 +0100
Committer: gtully <[email protected]>
Committed: Wed Jun 13 13:10:52 2018 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   6 +-
 .../broker/jmx/PurgeLargeMessageTest.java       | 172 +++++++++++++++++++
 2 files changed, 174 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/57a6f237/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index d5cddaa..a8463d3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2117,12 +2117,10 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                     LOG.debug("{} cursor blocked, no space available to page 
in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
                 }
             }
-        } else {
-            // Avoid return null list, if condition is not validated
-            resultList = new OrderedPendingList();
         }
 
-        return resultList;
+        // Avoid return null list, if condition is not validated
+        return resultList != null ? resultList : new OrderedPendingList();
     }
 
     private final boolean haveRealConsumer() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/57a6f237/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeLargeMessageTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeLargeMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeLargeMessageTest.java
new file mode 100644
index 0000000..600af1c
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeLargeMessageTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Purging with large message that hit the memory limit boundaries of a queue.
+ */
+public class PurgeLargeMessageTest extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PurgeLargeMessageTest.class);
+
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+    protected String clientID = "foo";
+
+    protected Connection connection;
+    protected boolean transacted;
+    protected int authMode = Session.AUTO_ACKNOWLEDGE;
+
+
+
+    public void testPurgeLargeMessage() throws Exception {
+
+        final int messageCount = 600;
+
+        // Send some messages
+        connection = connectionFactory.createConnection();
+        connection.setClientID(clientID);
+        connection.start();
+        Session session = connection.createSession(transacted, authMode);
+        destination = createDestination();
+        MessageProducer producer = session.createProducer(destination);
+        Message message = createTextMessage(session, 5000);
+        for (int i = 0; i < messageCount; i++) {
+            producer.send(message);
+        }
+
+        // Now get the QueueViewMBean and purge
+        String objectNameStr = broker.getBrokerObjectName().toString();
+        objectNameStr += 
",destinationType=Queue,destinationName="+getDestinationString();
+        ObjectName queueViewMBeanName = 
assertRegisteredObjectName(objectNameStr);
+        QueueViewMBean proxy = 
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
queueViewMBeanName, QueueViewMBean.class, true);
+
+        long count = proxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+
+        //force a page in to exceed the memory limit.
+        proxy.browse();
+
+        // try a purge, with message still to load from the store.
+        proxy.purge();
+
+        count = proxy.getQueueSize();
+        assertEquals("Queue size", count, 0);
+        assertEquals("Browse size", proxy.browseMessages().size(), 0);
+
+        producer.close();
+    }
+
+    private TextMessage createTextMessage(Session session, long textSize) 
throws JMSException {
+
+        StringBuilder stringBuilder = new StringBuilder();
+        for(long i = 0; i<textSize;i++){
+            stringBuilder.append('A');
+        }
+        return session.createTextMessage(stringBuilder.toString());
+    }
+
+
+    protected ObjectName assertRegisteredObjectName(String name) throws 
MalformedObjectNameException, NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (mbeanServer.isRegistered(objectName)) {
+            echo("Bean Registered: " + objectName);
+        } else {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        useTopic = false;
+        super.setUp();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(true);
+        answer.setEnableStatistics(true);
+        answer.addConnector(bindAddress);
+        answer.deleteAllMessages();
+
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+        PolicyEntry pe = new PolicyEntry();
+
+        //make the paging a little more deterministic
+        // by turning off the cache and periodic expiry
+        pe.setQueue(">");
+        pe.setUseCache(false);
+        pe.setMemoryLimit(1000000);
+        pe.setExpireMessagesPeriod(0);
+
+
+        entries.add(pe);
+
+        policyMap.setPolicyEntries(entries);
+        answer.setDestinationPolicy(policyMap);
+
+        return answer;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    protected void echo(String text) {
+        LOG.info(text);
+    }
+
+    /**
+     * Returns the name of the destination used in this test case
+     */
+    protected String getDestinationString() {
+        return getClass().getName() + "." + getName(true);
+    }
+}

Reply via email to