Author: gtully
Date: Tue Apr 20 15:13:18 2010
New Revision: 935954
URL: http://svn.apache.org/viewvc?rev=935954&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2651 - modified patch
applied with thanks, did not change the default as there are a bunch of tests
and user applications that depend on the current default
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=935954&r1=935953&r2=935954&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Apr 20 15:13:18 2010
@@ -60,6 +60,7 @@ public abstract class PrefetchSubscripti
protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new
CopyOnWriteArrayList<MessageReference>();
protected int prefetchExtension;
+ protected boolean usePrefetchExtension = true;
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
@@ -257,7 +258,7 @@ public abstract class PrefetchSubscripti
// contract prefetch if dispatch required a pull
if (getPrefetchSize() == 0) {
prefetchExtension = Math.max(0,
prefetchExtension - index);
- } else if (context.isInTransaction()) {
+ } else if (usePrefetchExtension &&
context.isInTransaction()) {
// extend prefetch window only if not a
pulling consumer
prefetchExtension =
Math.max(prefetchExtension, index);
}
@@ -307,7 +308,9 @@ public abstract class PrefetchSubscripti
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
- prefetchExtension = Math.max(prefetchExtension, index
+ 1);
+ if (usePrefetchExtension) {
+ prefetchExtension = Math.max(prefetchExtension,
index + 1);
+ }
destination = node.getRegionDestination();
callDispatchMatched = true;
break;
@@ -746,4 +749,12 @@ public abstract class PrefetchSubscripti
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
+
+ public boolean isUsePrefetchExtension() {
+ return usePrefetchExtension;
+ }
+
+ public void setUsePrefetchExtension(boolean usePrefetchExtension) {
+ this.usePrefetchExtension = usePrefetchExtension;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=935954&r1=935953&r2=935954&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Tue Apr 20 15:13:18 2010
@@ -83,7 +83,8 @@ public class PolicyEntry extends Destina
private int
queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
private int
durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
- private int cursorMemoryHighWaterMark=70;
+ private boolean usePrefetchExtension = true;
+ private int cursorMemoryHighWaterMark = 70;
private int storeUsageHighWaterMark = 100;
@@ -195,7 +196,7 @@ public class PolicyEntry extends Destina
}
sub.setMaxAuditDepth(getMaxAuditDepth());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
-
+ sub.setUsePrefetchExtension(isUsePrefetchExtension());
}
public void configure(Broker broker, SystemUsage memoryManager,
QueueBrowserSubscription sub) {
@@ -207,6 +208,7 @@ public class PolicyEntry extends Destina
sub.setPrefetchSize(getQueueBrowserPrefetch());
}
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+ sub.setUsePrefetchExtension(isUsePrefetchExtension());
}
public void configure(Broker broker, SystemUsage memoryManager,
QueueSubscription sub) {
@@ -218,6 +220,7 @@ public class PolicyEntry extends Destina
sub.setPrefetchSize(getQueuePrefetch());
}
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+ sub.setUsePrefetchExtension(isUsePrefetchExtension());
}
// Properties
@@ -692,12 +695,20 @@ public class PolicyEntry extends Destina
this.durableTopicPrefetch = durableTopicPrefetch;
}
+ public boolean isUsePrefetchExtension() {
+ return this.usePrefetchExtension;
+ }
+
+ public void setUsePrefetchExtension(boolean usePrefetchExtension) {
+ this.usePrefetchExtension = usePrefetchExtension;
+ }
+
public int getCursorMemoryHighWaterMark() {
- return this.cursorMemoryHighWaterMark;
- }
+ return this.cursorMemoryHighWaterMark;
+ }
- public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
{
- this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+ this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
}
public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java?rev=935954&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
Tue Apr 20 15:13:18 2010
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+// see: https://issues.apache.org/activemq/browse/AMQ-2651
+public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
+ private static final Log LOG =
LogFactory.getLog(OnePrefetchAsyncConsumerTest.class);
+
+ private TestMutex testMutex;
+ protected Connection connection;
+ protected ConnectionConsumer connectionConsumer;
+ protected Queue queue;
+ protected CountDownLatch messageTwoDelay = new CountDownLatch(1);
+
+ public void testPrefetchExtension() throws Exception {
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+
+ // when Msg1 is acked, the PrefetchSubscription will (incorrectly?)
increment its prefetchExtension
+ producer.send(session.createTextMessage("Msg1"));
+
+ // Msg2 will exhaust the ServerSessionPool (since it only has 1
ServerSession)
+ producer.send(session.createTextMessage("Msg2"));
+
+ // Msg3 will cause the test to fail as it will attempt to retrieve an
additional ServerSession from
+ // an exhausted ServerSessionPool due to the (incorrectly?)
incremented prefetchExtension in the PrefetchSubscription
+ producer.send(session.createTextMessage("Msg3"));
+
+ session.commit();
+
+ // wait for test to complete and the test result to get set
+ // this happens asynchronously since the messages are delivered
asynchronously
+ synchronized (testMutex) {
+ while (!testMutex.testCompleted) {
+ testMutex.wait();
+ }
+ }
+
+ //test completed, result is ready
+ assertTrue("Attempted to retrieve more than one ServerSession at a
time", testMutex.testSuccessful);
+ }
+
+ protected void setUp() throws Exception {
+ bindAddress = "tcp://localhost:61616";
+ super.setUp();
+
+ testMutex = new TestMutex();
+ connection = createConnection();
+ queue = createQueue();
+ // note the last arg of 1, this becomes the prefetchSize in
PrefetchSubscription
+ connectionConsumer = connection.createConnectionConsumer(
+ queue, null, new TestServerSessionPool(connection), 1);
+ connection.start();
+ }
+
+ protected void tearDown() throws Exception {
+ connectionConsumer.close();
+ connection.close();
+ super.tearDown();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = super.createBroker();
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ // ensure prefetch is exact. only delivery next when current is acked
+ defaultEntry.setUsePrefetchExtension(false);
+ policyMap.setDefaultEntry(defaultEntry);
+ answer.setDestinationPolicy(policyMap);
+ return answer;
+ }
+
+ protected Queue createQueue() {
+ return new ActiveMQQueue(getDestinationString());
+ }
+
+ // simulates a ServerSessionPool with only 1 ServerSession
+ private class TestServerSessionPool implements ServerSessionPool {
+ Connection connection;
+ TestServerSession serverSession;
+ boolean serverSessionInUse = false;
+
+ public TestServerSessionPool(Connection connection) throws
JMSException {
+ this.connection = connection;
+ serverSession = new TestServerSession(this);
+ }
+
+ public ServerSession getServerSession() throws JMSException {
+ synchronized (this) {
+ if (serverSessionInUse) {
+ LOG.info("asked for session while in use, not serialised
delivery");
+ synchronized (testMutex) {
+ testMutex.testSuccessful = false;
+ testMutex.testCompleted = true;
+ }
+ }
+ serverSessionInUse = true;
+ return serverSession;
+ }
+ }
+ }
+
+ private class TestServerSession implements ServerSession {
+ TestServerSessionPool pool;
+ Session session;
+
+ public TestServerSession(TestServerSessionPool pool) throws
JMSException {
+ this.pool = pool;
+ session = pool.connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ session.setMessageListener(new TestMessageListener());
+ }
+
+ public Session getSession() throws JMSException {
+ return session;
+ }
+
+ public void start() throws JMSException {
+ // use a separate thread to process the message asynchronously
+ new Thread() {
+ public void run() {
+ // let the session deliver the message
+ session.run();
+
+ // commit the tx
+ try {
+ session.commit();
+ }
+ catch (JMSException e) {
+ }
+
+ // return ServerSession to pool
+ synchronized (pool) {
+ pool.serverSessionInUse = false;
+ }
+
+ // let the test check if the test was completed
+ synchronized (testMutex) {
+ testMutex.notify();
+ }
+ }
+ }.start();
+ }
+ }
+
+ private class TestMessageListener implements MessageListener {
+ public void onMessage(Message message) {
+ try {
+ String text = ((TextMessage)message).getText();
+ LOG.info("got message: " + text);
+ if (text.equals("Msg3")) {
+ // if we get here, Exception in getServerSession() was not
thrown, test is successful
+ // this obviously doesn't happen now,
+ // need to fix prefetchExtension computation logic in
PrefetchSubscription to get here
+ synchronized (testMutex) {
+ if (!testMutex.testCompleted) {
+ testMutex.testSuccessful = true;
+ testMutex.testCompleted = true;
+ }
+ }
+ }
+ else if (text.equals("Msg2")) {
+ // simulate long message processing so that Msg3 comes when
Msg2 is still being processed
+ // and thus the single ServerSession is in use
+ TimeUnit.SECONDS.sleep(4);
+ }
+ }
+ catch (JMSException e) {
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private class TestMutex {
+ boolean testCompleted = false;
+ boolean testSuccessful = true;
+ }
+}
\ No newline at end of file
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date