Author: gtully
Date: Wed Feb 10 10:17:50 2010
New Revision: 908437
URL: http://svn.apache.org/viewvc?rev=908437&view=rev
Log:
svn merge -c 908060 https://svn.apache.org/repos/asf/activemq/trunk, svn merge
-c 908121 https://svn.apache.org/repos/asf/activemq/trunk - resolve failure of
JDBCStoreBrokerTest on slow machines, identified a sync issue with dispatch to
queue browsers. Tidied up dispatch of queue browser to tie it to normal
dispatch and avoid the timing/sync issue, browser snapshot of paged in messages
is only guaranteed to be > pageSize, test should assert that only
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=908437&r1=908436&r2=908437&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Feb 10 10:17:50 2010
@@ -296,16 +296,13 @@
}
/*
- * Holder for subscription and pagedInMessages as a browser needs access to
- * existing messages in the queue that have already been dispatched
+ * Holder for subscription that needs attention on next iterate
+ * browser needs access to existing messages in the queue that have
already been dispatched
*/
class BrowserDispatch {
- ArrayList<QueueMessageReference> messages;
QueueBrowserSubscription browser;
- public BrowserDispatch(QueueBrowserSubscription browserSubscription,
Collection<QueueMessageReference> values) {
-
- messages = new ArrayList<QueueMessageReference>(values);
+ public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
browser = browserSubscription;
browser.incrementQueueRef();
}
@@ -362,18 +359,14 @@
}
if (sub instanceof QueueBrowserSubscription) {
+ // tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription =
(QueueBrowserSubscription) sub;
-
- // do again in iterate to ensure new messages are dispatched
- pageInMessages(false);
-
synchronized (pagedInMessages) {
- if (!pagedInMessages.isEmpty()) {
- BrowserDispatch browserDispatch = new
BrowserDispatch(browserSubscription, pagedInMessages.values());
- browserDispatches.addLast(browserDispatch);
- }
+ BrowserDispatch browserDispatch = new
BrowserDispatch(browserSubscription);
+ browserDispatches.addLast(browserDispatch);
}
}
+
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
@@ -1154,7 +1147,7 @@
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
- boolean pageInMoreMessages = false;
+ boolean pageInMoreMessages = false;
synchronized (iteratingMutex) {
// do early to allow dispatch of these waiting messages
@@ -1172,31 +1165,6 @@
}
}
- BrowserDispatch rd;
- while ((rd = getNextBrowserDispatch()) != null) {
- pageInMoreMessages = true;
-
- try {
- MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
- msgContext.setDestination(destination);
-
- QueueBrowserSubscription browser = rd.getBrowser();
- for (QueueMessageReference node : rd.messages) {
- if (!node.isAcked()) {
- msgContext.setMessageReference(node);
- if (browser.matches(node, msgContext)) {
- browser.add(node);
- }
- }
- }
-
- rd.done();
-
- } catch (Exception e) {
- LOG.warn("exception on dispatch to browser: " +
rd.getBrowser(), e);
- }
- }
-
if (firstConsumer) {
firstConsumer = false;
try {
@@ -1225,6 +1193,8 @@
LOG.error(e);
}
}
+
+ BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
synchronized (messages) {
pageInMoreMessages |= !messages.isEmpty();
@@ -1239,14 +1209,46 @@
// Perhaps we should page always into the pagedInPendingDispatch
list if
// !messages.isEmpty(), and then if
!pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
- if (pageInMoreMessages) {
+ if (pageInMoreMessages || pendingBrowserDispatch != null) {
try {
- pageInMessages(false);
+ pageInMessages(pendingBrowserDispatch != null);
} catch (Throwable e) {
LOG.error("Failed to page in more queue messages ", e);
}
}
+
+ if (pendingBrowserDispatch != null) {
+ ArrayList<QueueMessageReference> alreadyDispatchedMessages =
null;
+ synchronized (pagedInMessages) {
+ alreadyDispatchedMessages = new
ArrayList<QueueMessageReference>(pagedInMessages.values());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("dispatch to browser: " +
pendingBrowserDispatch.getBrowser()
+ + ", already dispatched/paged count: " +
alreadyDispatchedMessages.size());
+ }
+ do {
+ try {
+ MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
+ msgContext.setDestination(destination);
+
+ QueueBrowserSubscription browser =
pendingBrowserDispatch.getBrowser();
+ for (QueueMessageReference node :
alreadyDispatchedMessages) {
+ if (!node.isAcked()) {
+ msgContext.setMessageReference(node);
+ if (browser.matches(node, msgContext)) {
+ browser.add(node);
+ }
+ }
+ }
+ pendingBrowserDispatch.done();
+ } catch (Exception e) {
+ LOG.warn("exception on dispatch to browser: " +
pendingBrowserDispatch.getBrowser(), e);
+ }
+
+ } while ((pendingBrowserDispatch = getNextBrowserDispatch())
!= null);
+ }
+
if (pendingWakeups.get() > 0) {
pendingWakeups.decrementAndGet();
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java?rev=908437&r1=908436&r2=908437&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
Wed Feb 10 10:17:50 2010
@@ -16,7 +16,9 @@
*/
package org.apache.activemq;
+import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.List;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -26,7 +28,15 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
/**
* @version $Revision: 1.4 $
@@ -127,5 +137,54 @@
browser.close();
producer.close();
- }
+ }
+
+ public void testQueueBrowserWith2Consumers() throws Exception {
+ final int numMessages = 1000;
+ connection.setAlwaysSyncSend(false);
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ ActiveMQQueue destinationPrefetch10 = new
ActiveMQQueue("TEST?jms.prefetchSize=10");
+ ActiveMQQueue destinationPrefetch1 = new
ActiveMQQueue("TEST?jms.prefetchsize=1");
+ connection.start();
+
+ ActiveMQConnection connection2 =
(ActiveMQConnection)factory.createConnection(userName, password);
+ connection2.start();
+ connections.add(connection2);
+ Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer =
session.createConsumer(destinationPrefetch10);
+
+ for (int i=0; i<numMessages; i++) {
+ TextMessage message = session.createTextMessage("Message: " + i);
+ producer.send(message);
+ }
+
+ QueueBrowser browser = session2.createBrowser(destinationPrefetch1);
+ Enumeration<Message> browserView = browser.getEnumeration();
+
+ List<Message> messages = new ArrayList<Message>();
+ for (int i = 0; i < numMessages; i++) {
+ Message m1 = consumer.receive(5000);
+ assertNotNull("m1 is null for index: " + i, m1);
+ messages.add(m1);
+ }
+
+ int i = 0;
+ for (; i < numMessages && browserView.hasMoreElements(); i++) {
+ Message m1 = messages.get(i);
+ Message m2 = browserView.nextElement();
+ assertNotNull("m2 is null for index: " + i, m2);
+ assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+ }
+
+ // currently browse max page size is ignored for a queue browser
consumer
+ // only guarantee is a page size - but a snapshot of pagedinpending is
+ // used so it is most likely more
+ assertTrue("got at least our expected minimum in the browser: ", i >
BaseDestination.MAX_PAGE_SIZE);
+
+ assertFalse("nothing left in the browser",
browserView.hasMoreElements());
+ assertNull("consumer finished", consumer.receiveNoWait());
+ }
}