Repository: activemq
Updated Branches:
  refs/heads/master 5b73ffad6 -> 2b84cd60b


https://issues.apache.org/jira/browse/AMQ-6128 - fix and test - fix values on 
priority ordered pending - messages assigned to browsers need to be ordered


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2b84cd60
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2b84cd60
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2b84cd60

Branch: refs/heads/master
Commit: 2b84cd60ba775a2e0106a3c6f577ea78b05b143b
Parents: 5b73ffa
Author: gtully <[email protected]>
Authored: Fri Jan 15 14:20:49 2016 +0000
Committer: gtully <[email protected]>
Committed: Fri Jan 15 14:31:09 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   5 +-
 .../region/cursors/OrderedPendingList.java      |   6 +-
 .../region/cursors/PrioritizedPendingList.java  |   9 +-
 .../cursors/PrioritizedPendingListTest.java     |  24 ++
 .../bugs/JMSQueueBrowserPriorityTest.java       | 225 +++++++++++++++++++
 5 files changed, 261 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2b84cd60/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index d447ebd..9a7feef 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1619,10 +1619,11 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             }
 
             if (hasBrowsers) {
-                ArrayList<MessageReference> alreadyDispatchedMessages = null;
+                PendingList alreadyDispatchedMessages = 
isPrioritizedMessages() ?
+                        new PrioritizedPendingList() : new 
OrderedPendingList();
                 pagedInMessagesLock.readLock().lock();
                 try{
-                    alreadyDispatchedMessages = new 
ArrayList<MessageReference>(pagedInMessages.values());
+                    alreadyDispatchedMessages.addAll(pagedInMessages);
                 }finally {
                     pagedInMessagesLock.readLock().unlock();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b84cd60/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
index 28fc13c..71b7212 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
@@ -169,8 +169,12 @@ public class OrderedPendingList implements PendingList {
 
     @Override
     public Collection<MessageReference> values() {
+        return getValues(this);
+    }
+
+    public static Collection<MessageReference> getValues(final PendingList 
pendingList) {
         List<MessageReference> messageReferences = new 
ArrayList<MessageReference>();
-        Iterator<MessageReference> iterator = iterator();
+        Iterator<MessageReference> iterator = pendingList.iterator();
         while (iterator.hasNext()) {
             messageReferences.add(iterator.next());
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b84cd60/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index da2ccaf..deabd50 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -27,6 +27,9 @@ import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.management.SizeStatisticImpl;
 
+
+import static 
org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
+
 public class PrioritizedPendingList implements PendingList {
 
     private static final Integer MAX_PRIORITY = 10;
@@ -164,11 +167,7 @@ public class PrioritizedPendingList implements PendingList 
{
 
     @Override
     public Collection<MessageReference> values() {
-        List<MessageReference> messageReferences = new 
ArrayList<MessageReference>();
-        for (PendingNode pendingNode : map.values()) {
-            messageReferences.add(pendingNode.getMessage());
-        }
-        return messageReferences;
+        return getValues(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b84cd60/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
index 6c40239..80d090d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
@@ -191,6 +191,30 @@ public class PrioritizedPendingListTest {
         }
     }
 
+    @Test
+    public void testValuesPriority() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1, 2));
+        list.addMessageFirst(new TestMessageReference(2, 1));
+        list.addMessageFirst(new TestMessageReference(3, 3));
+        list.addMessageFirst(new TestMessageReference(4, 5));
+        list.addMessageFirst(new TestMessageReference(5, 4));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessage().getPriority());
+        }
+
+        lastId = list.size();
+        for (MessageReference messageReference : list.values()) {
+            assertEquals(lastId--, 
messageReference.getMessage().getPriority());
+        }
+    }
+
     static class TestMessageReference implements MessageReference {
 
         private static final IdGenerator id = new IdGenerator();

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b84cd60/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/JMSQueueBrowserPriorityTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/JMSQueueBrowserPriorityTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/JMSQueueBrowserPriorityTest.java
new file mode 100644
index 0000000..3029fb2
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/JMSQueueBrowserPriorityTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// https://issues.apache.org/jira/browse/AMQ-6128
+public class JMSQueueBrowserPriorityTest extends TestCase {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(JMSQueueBrowserPriorityTest.class);
+    private static final String TEST_AMQ_BROKER_URI = "tcp://localhost:0";
+    private BrokerService broker;
+    public static final byte[] PAYLOAD = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 
9};
+
+
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    protected void tearDown() throws Exception {
+
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+
+    /**
+     * Send MEDIUM priority
+     * Send HIGH priority
+     * Send HIGH priority
+     * <p/>
+     * browse the list of messages
+     * <p/>
+     * consume the messages from the queue
+     * <p/>
+     * Compare browse and consumed messages - they should be the same
+     *
+     * @throws Exception
+     */
+    public void testBrowsePriorityMessages() throws Exception {
+
+
+        for (int i = 0; i < 5; i++) {
+
+            // MED
+            produceMessages(3, 4, "TestQ");
+
+            Thread.sleep(1000);
+
+            // HI
+            produceMessages(3, 9, "TestQ");
+
+            // browse messages, will page in
+            ArrayList<Integer> browseList = browseQueue("TestQ");
+
+            // HI
+            produceMessages(3, 9, "TestQ");
+
+            // browse again to be sure new messages are picked up
+            browseList = browseQueue("TestQ");
+
+            // consume messages to verify order
+            ArrayList<Integer> consumeList = consumeMessages("TestQ");
+
+            if (!browseList.equals(consumeList)) {
+                LOG.info("browseList size " + browseList.size());
+                LOG.info("consumeList size " + consumeList.size());
+                LOG.info("browseList is:" + browseList);
+                LOG.info("consumeList is:" + consumeList);
+            }
+
+            // compare lists
+            assertTrue("browseList and consumeList should be equal, iteration 
" + i, browseList.equals(consumeList));
+        }
+    }
+
+    private void produceMessages(int numberOfMessages, int priority, String 
queueName) throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getDefaultSocketURIString());
+        Connection connection = connectionFactory.createConnection();
+        try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(new 
ActiveMQQueue(queueName));
+            connection.start();
+
+
+            for (int i = 0; i < numberOfMessages; i++) {
+                BytesMessage m = session.createBytesMessage();
+                m.writeBytes(PAYLOAD);
+                m.setJMSPriority(priority);
+                producer.send(m, Message.DEFAULT_DELIVERY_MODE, 
m.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE);
+            }
+
+        } finally {
+
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    private ArrayList<Integer> browseQueue(String queueName) throws Exception {
+
+        ArrayList<Integer> returnedMessages = new ArrayList<Integer>();
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getDefaultSocketURIString());
+        Connection connection = connectionFactory.createConnection();
+        try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            QueueBrowser browser = session.createBrowser(new 
ActiveMQQueue(queueName));
+            connection.start();
+
+            Enumeration<Message> browsedMessages = browser.getEnumeration();
+
+            while (browsedMessages.hasMoreElements()) {
+                
returnedMessages.add(browsedMessages.nextElement().getJMSPriority());
+            }
+
+            return returnedMessages;
+
+        } finally {
+
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+    }
+
+
+    private ArrayList<Integer> consumeMessages(String queueName) throws 
Exception {
+
+        ArrayList<Integer> returnedMessages = new ArrayList<Integer>();
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getDefaultSocketURIString());
+        connectionFactory.setMessagePrioritySupported(true);
+        Connection connection = connectionFactory.createConnection();
+        try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(new 
ActiveMQQueue(queueName));
+            connection.start();
+            boolean finished = false;
+
+            while (!finished) {
+
+                Message message = consumer.receive(1000);
+                if (message == null) {
+                    finished = true;
+                }
+
+                if (message != null) {
+                    returnedMessages.add(message.getJMSPriority());
+                }
+
+            }
+            return returnedMessages;
+
+        } finally {
+
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    private BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+        PolicyEntry pe = new PolicyEntry();
+
+
+        pe.setProducerFlowControl(true);
+        pe.setUseCache(true);
+
+        pe.setPrioritizedMessages(true);
+        pe.setExpireMessagesPeriod(0);
+
+        pe.setQueue(">");
+        entries.add(pe);
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+
+
+        broker.addConnector(TEST_AMQ_BROKER_URI);
+        broker.deleteAllMessages();
+        return broker;
+    }
+}
\ No newline at end of file

Reply via email to