Author: rajdavies
Date: Fri Dec 28 11:52:24 2007
New Revision: 607317
URL: http://svn.apache.org/viewvc?rev=607317&view=rev
Log:
Tweaking for Queue performance and concurrency
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Dec 28 11:52:24 2007
@@ -17,12 +17,10 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -78,7 +76,7 @@
private final Log log;
private final ActiveMQDestination destination;
- private final List<Subscription> consumers = new
CopyOnWriteArrayList<Subscription>();
+ private final List<Subscription> consumers = new
ArrayList<Subscription>(50);
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
private final DestinationStatistics destinationStatistics = new
DestinationStatistics();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Fri Dec 28 11:52:24 2007
@@ -40,6 +40,7 @@
protected boolean enableAudit=true;
protected ActiveMQMessageAudit audit;
private boolean started=false;
+
public synchronized void start() throws Exception {
if (!started && enableAudit && audit==null) {
@@ -259,6 +260,10 @@
if (this.audit != null) {
audit.rollback(id);
}
+ }
+
+ protected synchronized boolean isStarted() {
+ return started;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Fri Dec 28 11:52:24 2007
@@ -43,6 +43,7 @@
private Destination regionDestination;
private int size;
private boolean fillBatchDuplicates;
+ private boolean cacheEnabled;
/**
* @param topic
@@ -56,7 +57,13 @@
}
- public void start() throws Exception{
+ public synchronized void start() throws Exception{
+ if (!isStarted()) {
+ this.size = getStoreSize();
+ if (this.size==0) {
+ cacheEnabled=true;
+ }
+ }
super.start();
store.resetBatching();
}
@@ -78,16 +85,22 @@
}
public synchronized int size() {
- try {
- size = store.getMessageCount();
- } catch (IOException e) {
- LOG.error("Failed to get message count", e);
- throw new RuntimeException(e);
+ if (isStarted()) {
+ return size;
}
+ this.size = getStoreSize();
return size;
+
}
public synchronized void addMessageLast(MessageReference node) throws
Exception {
+ if (cacheEnabled && !isFull()) {
+ //optimization - A persistent queue will add the message to
+ //to store then retrieve it again from the store.
+ recoverMessage(node.getMessage());
+ }else {
+ cacheEnabled=false;
+ }
size++;
}
@@ -95,12 +108,16 @@
size++;
}
- public void remove() {
+ public synchronized void remove() {
size--;
+ if (size==0 && isStarted()) {
+ cacheEnabled=true;
+ }
}
public void remove(MessageReference node) {
size--;
+ cacheEnabled=false;
}
public synchronized boolean hasNext() {
@@ -157,10 +174,11 @@
}
}
- public void gc() {
+ public synchronized void gc() {
for (Message msg : batchList) {
msg.decrementReferenceCount();
}
+ cacheEnabled=false;
batchList.clear();
}
@@ -172,6 +190,15 @@
store.recoverNextMessages(maxBatchSize, this);
}
fillBatchDuplicates=false;
+ }
+
+ protected synchronized int getStoreSize() {
+ try {
+ return store.getMessageCount();
+ } catch (IOException e) {
+ LOG.error("Failed to get message count", e);
+ throw new RuntimeException(e);
+ }
}
public String toString() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
Fri Dec 28 11:52:24 2007
@@ -43,11 +43,15 @@
* @see
org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference,
* org.apache.activemq.filter.MessageEvaluationContext,
java.util.List)
*/
- public boolean dispatch(MessageReference node, MessageEvaluationContext
msgContext, List<Subscription> consumers) throws Exception {
- int count = 0;
+ public boolean dispatch(MessageReference node,
+ MessageEvaluationContext msgContext, List<Subscription> consumers)
+ throws Exception {
+ int count = 0;
- Subscription firstMatchingConsumer = null;
- for (Iterator<Subscription> iter = consumers.iterator();
iter.hasNext();) {
+ Subscription firstMatchingConsumer = null;
+ synchronized (consumers) {
+ for (Iterator<Subscription> iter = consumers.iterator(); iter
+ .hasNext();) {
Subscription sub = iter.next();
// Only dispatch to interested subscriptions
@@ -71,6 +75,7 @@
} catch (Throwable bestEffort) {
}
}
- return count > 0;
+ }
+ return count > 0;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Fri Dec 28 11:52:24 2007
@@ -432,7 +432,7 @@
*/
public void recover(final MessageRecoveryListener listener) throws
Exception {
flush();
- referenceStore.recover(new RecoveryListenerAdapter(this, listener));
+ referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
public void start() throws Exception {
@@ -483,29 +483,41 @@
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener
listener) throws Exception {
- /*
- * RecoveryListenerAdapter recoveryListener=new
- * RecoveryListenerAdapter(this,listener);
- * if(referenceStore.supportsExternalBatchControl()){
- * synchronized(this){
- * referenceStore.recoverNextMessages(maxReturned,recoveryListener);
- * if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ //
check
- * for inflight messages int count=0;
Iterator<Entry<MessageId,ReferenceData>>
- * iterator=messages.entrySet().iterator();
- *
while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
- * Entry<MessageId,ReferenceData> entry=iterator.next(); ReferenceData
- * data=entry.getValue(); Message message=getMessage(data);
- * recoveryListener.recoverMessage(message); count++; }
- *
referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } }
- * }else{ flush();
- * referenceStore.recoverNextMessages(maxReturned,recoveryListener); }
- */
+
+ RecoveryListenerAdapter recoveryListener = new
RecoveryListenerAdapter(
+ this, listener);
+ if (referenceStore.supportsExternalBatchControl()) {
+ synchronized (this) {
+ referenceStore.recoverNextMessages(maxReturned,
+ recoveryListener);
+ if (recoveryListener.size() == 0 &&
recoveryListener.hasSpace()) {
+ int count = 0;
+ Iterator<Entry<MessageId, ReferenceData>> iterator =
messages
+ .entrySet().iterator();
+ while (iterator.hasNext() && count < maxReturned
+ && recoveryListener.hasSpace()) {
+ Entry<MessageId, ReferenceData> entry =
iterator.next();
+ ReferenceData data = entry.getValue();
+ Message message = getMessage(data);
+ recoveryListener.recoverMessage(message);
+ count++;
+ }
+ referenceStore.setBatch(recoveryListener
+ .getLastRecoveredMessageId());
+ }
+ }
+ } else {
+ flush();
+ referenceStore.recoverNextMessages(maxReturned, recoveryListener);
+ }
+ /*
RecoveryListenerAdapter recoveryListener = new
RecoveryListenerAdapter(this, listener);
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
}
+ */
}
Message getMessage(ReferenceData data) throws IOException {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
Fri Dec 28 11:52:24 2007
@@ -24,7 +24,8 @@
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
- broker.setPersistent(false);
+ //broker.setPersistent(false);
+ broker.setDeleteAllMessagesOnStartup(true);
persistenceAdapter = broker.getPersistenceAdapter();
return broker;
}
@@ -35,7 +36,7 @@
*/
protected BrokerService createRestartedBroker() throws Exception {
BrokerService broker = new BrokerService();
- broker.setPersistenceAdapter(persistenceAdapter);
+ //broker.setPersistenceAdapter(persistenceAdapter);
return broker;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
Fri Dec 28 11:52:24 2007
@@ -30,9 +30,9 @@
}
protected void setUp() throws Exception {
- numberOfConsumers = 50;
- numberofProducers = 50;
- this.consumerSleepDuration=10;
+ numberOfConsumers = 10;
+ numberofProducers = 10;
+ this.consumerSleepDuration=20;
super.setUp();
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
Fri Dec 28 11:52:24 2007
@@ -18,6 +18,7 @@
import java.util.Date;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -56,7 +57,7 @@
public void testTransaction() throws Exception {
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost");
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connection = factory.createConnection();
queue = new ActiveMQQueue(getClass().getName() + "." + getName());
@@ -104,8 +105,8 @@
}
LOG.info("Waiting for latch");
- latch.await();
-
+ latch.await(2,TimeUnit.SECONDS);
+ assertNotNull(receivedText);
LOG.info("test completed, destination=" + receivedText);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
Fri Dec 28 11:52:24 2007
@@ -120,7 +120,7 @@
}
protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
- return new ActiveMQConnectionFactory("vm://localhost");
+ return new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
}
/**
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
Fri Dec 28 11:52:24 2007
@@ -107,18 +107,6 @@
LOG.info("Waiting for latch");
latch.await(2,TimeUnit.SECONDS);
- if (receivedText==null) {
- /*
- Map<Thread,StackTraceElement[]> map = Thread.getAllStackTraces();
- for (Map.Entry<Thread,StackTraceElement[]> entry: map.entrySet()) {
- System.out.println(entry.getKey());
- for (StackTraceElement element :entry.getValue()) {
- System.out.println(element);
- }
- }
- */
- fail("No message received");
- }
assertNotNull(receivedText);
LOG.info("test completed, destination=" + receivedText);
}