Author: gtully
Date: Fri Nov 13 17:04:09 2009
New Revision: 835921
URL: http://svn.apache.org/viewvc?rev=835921&view=rev
Log:
merge -c 835874 - resolve https://issues.apache.org/activemq/browse/AMQ-2483
and https://issues.apache.org/activemq/browse/AMQ-2028, keep track of
outstanding wakeup requests in a queue with regular task runner avoids build up
in determintic task runner. Exposed useDeterministicTaskRunner to validate some
test that fail with the -DuseDedicatedTaskRunner=true system property. With
broker.useDedicatedTask=false, Queues will use pooled executor for dispatch.
Added:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
- copied unchanged from r835874,
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Nov 13 17:04:09 2009
@@ -114,7 +114,7 @@
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync = true;
- private TaskRunnerFactory sessionTaskRunner = new
TaskRunnerFactory("ActiveMQ Session Task",
ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
+ private TaskRunnerFactory sessionTaskRunner;
private final ThreadPoolExecutor asyncConnectionThread;
// Connection state variables
@@ -186,6 +186,7 @@
private ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
+ private boolean useDedicatedTaskRunner;
/**
* Construct an <code>ActiveMQConnection</code>
@@ -644,7 +645,9 @@
// factory
// then we may need to call
// factory.onConnectionClose(this);
- sessionTaskRunner.shutdown();
+ if (sessionTaskRunner != null) {
+ sessionTaskRunner.shutdown();
+ }
closed.set(true);
closing.set(false);
}
@@ -927,7 +930,20 @@
transportListeners.remove(transportListener);
}
+ public boolean isUseDedicatedTaskRunner() {
+ return useDedicatedTaskRunner;
+ }
+
+ public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+ this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+ }
+
public TaskRunnerFactory getSessionTaskRunner() {
+ synchronized (this) {
+ if (sessionTaskRunner == null) {
+ sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session
Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000,
isUseDedicatedTaskRunner());
+ }
+ }
return sessionTaskRunner;
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Fri Nov 13 17:04:09 2009
@@ -107,12 +107,13 @@
private boolean watchTopicAdvisories = true;
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
private long warnAboutUnstartedConnectionTimeout = 500L;
- private int sendTimeout =0;
+ private int sendTimeout = 0;
private boolean sendAcksAsync=true;
private TransportListener transportListener;
- private ExceptionListener exceptionListener;
- private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
- private int auditMaximumProducerNumber =
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+ private ExceptionListener exceptionListener;
+ private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+ private int auditMaximumProducerNumber =
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+ private boolean useDedicatedTaskRunner;
// /////////////////////////////////////////////
//
@@ -313,6 +314,7 @@
connection.setSendAcksAsync(isSendAcksAsync());
connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
+ connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@@ -903,4 +905,12 @@
public void setAuditMaximumProducerNumber(int
auditMaximumProducerNumber) {
this.auditMaximumProducerNumber = auditMaximumProducerNumber;
}
+
+ public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+ this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+ }
+
+ public boolean isUseDedicatedTaskRunner() {
+ return useDedicatedTaskRunner;
+ }
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Fri Nov 13 17:04:09 2009
@@ -91,6 +91,10 @@
if (taskRunner == null) {
synchronized (this) {
if (this.taskRunner == null) {
+ if (!isRunning()) {
+ // stop has been called
+ return;
+ }
this.taskRunner =
session.connection.getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " +
session.getSessionId());
}
@@ -142,11 +146,12 @@
void stop() throws JMSException {
try {
if (messageQueue.isRunning()) {
- messageQueue.stop();
- TaskRunner taskRunner = this.taskRunner;
- if (taskRunner != null) {
- this.taskRunner = null;
- taskRunner.shutdown();
+ synchronized(this) {
+ messageQueue.stop();
+ if (this.taskRunner != null) {
+ this.taskRunner.shutdown();
+ this.taskRunner = null;
+ }
}
}
} catch (InterruptedException e) {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Nov 13 17:04:09 2009
@@ -933,7 +933,7 @@
public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
if (taskRunnerFactory == null) {
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence
Adaptor Task", persistenceThreadPriority,
- true, 1000);
+ true, 1000, isDedicatedTaskRunner());
}
return persistenceTaskRunnerFactory;
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Nov 13 17:04:09 2009
@@ -899,21 +899,26 @@
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
}
- new Thread("ActiveMQ Transport Stopper: " +
transport.getRemoteAddress()) {
- @Override
- public void run() {
- serviceLock.writeLock().lock();
- try {
- doStop();
- } catch (Throwable e) {
- LOG.debug("Error occured while shutting down a
connection to '" + transport.getRemoteAddress()
- + "': ", e);
- } finally {
- stopped.countDown();
- serviceLock.writeLock().unlock();
+ try {
+ new Thread("ActiveMQ Transport Stopper: " +
transport.getRemoteAddress()) {
+ @Override
+ public void run() {
+ serviceLock.writeLock().lock();
+ try {
+ doStop();
+ } catch (Throwable e) {
+ LOG.debug("Error occured while shutting down a
connection to '" + transport.getRemoteAddress()
+ + "': ", e);
+ } finally {
+ stopped.countDown();
+ serviceLock.writeLock().unlock();
+ }
}
- }
- }.start();
+ }.start();
+ } catch (Throwable t) {
+ LOG.warn("cannot create async transport stopper thread.. not
waiting for stop to complete, reason:", t);
+ stopped.countDown();
+ }
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Nov 13 17:04:09 2009
@@ -31,9 +31,8 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -65,7 +64,6 @@
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@@ -86,7 +84,7 @@
*/
public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Log LOG = LogFactory.getLog(Queue.class);
- protected TaskRunnerFactory taskFactory;
+ protected final TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner;
protected final List<Subscription> consumers = new
ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
@@ -108,9 +106,11 @@
private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch;
+ private AtomicLong pendingWakeups = new AtomicLong();
+
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
- wakeup();
+ asyncWakeup();
}
};
private final Runnable expireMessagesTask = new Runnable() {
@@ -164,26 +164,13 @@
// since it turns into a shared blocking queue which can lead to a
network deadlock.
// If we are cursoring to disk..it's not and issue because it does not
block due
// to large disk sizes.
- if( messages instanceof VMPendingMessageCursor ) {
+ if (messages instanceof VMPendingMessageCursor) {
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
}
- if (isOptimizedDispatch()) {
- this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:
" + destination.getPhysicalName());
- }else {
- final Queue queue = this;
- this.executor = Executors.newSingleThreadExecutor(new
ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new QueueThread(runnable,
"QueueThread:"+destination, queue);
- thread.setDaemon(true);
- thread.setPriority(Thread.NORM_PRIORITY);
- return thread;
- }
- });
-
- this.taskRunner = new DeterministicTaskRunner(this.executor,this);
- }
+ this.taskRunner =
+ taskFactory.createTaskRunner(this, "Queue:" +
destination.getPhysicalName());
super.initialize();
if (store != null) {
@@ -591,6 +578,7 @@
}
};
doBrowse(browsedMessages, this.getMaxExpirePageSize());
+ asyncWakeup();
}
public void gc(){
@@ -1190,8 +1178,8 @@
} catch (Throwable e) {
LOG.error("Failed to page in more queue messages ", e);
}
- }
- return !messagesWaitingForSpace.isEmpty();
+ }
+ return pendingWakeups.decrementAndGet() > 0;
}
}
@@ -1297,7 +1285,6 @@
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
}
- asyncWakeup();
}
protected ConnectionContext createConnectionContext() {
@@ -1336,14 +1323,16 @@
public void wakeup() {
if (optimizedDispatch || isSlave()) {
iterate();
+ pendingWakeups.incrementAndGet();
} else {
asyncWakeup();
}
}
-
- public void asyncWakeup() {
+
+ private void asyncWakeup() {
try {
- this.taskRunner.wakeup();
+ pendingWakeups.incrementAndGet();
+ this.taskRunner.wakeup();
} catch (InterruptedException e) {
LOG.warn("Async task tunner failed to wakeup ", e);
}
@@ -1432,7 +1421,7 @@
pagedInPendingDispatch.add(qmr);
}
}
- doWakeUp = true;
+ doWakeUp = true;
}
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
Fri Nov 13 17:04:09 2009
@@ -55,6 +55,7 @@
private boolean recoverReferenceStore=true;
private boolean forceRecoverReferenceStore=false;
private long checkpointInterval = 1000 * 20;
+ private boolean useDedicatedTaskRunner;
/**
@@ -109,13 +110,21 @@
this.dataDirectory = dataDirectory;
}
+ public boolean isUseDedicatedTaskRunner() {
+ return useDedicatedTaskRunner;
+ }
+
+ public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+ this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+ }
+
/**
* @return the taskRunnerFactory
*/
public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor
Task", journalThreadPriority,
- true, 1000);
+ true, 1000,
isUseDedicatedTaskRunner());
}
return taskRunnerFactory;
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
Fri Nov 13 17:04:09 2009
@@ -53,6 +53,7 @@
private boolean failIfJournalIsLocked;
private int journalThreadPriority = Thread.MAX_PRIORITY;
private JDBCPersistenceAdapter jdbcPersistenceAdapter = new
JDBCPersistenceAdapter();
+ private boolean useDedicatedTaskRunner;
public PersistenceAdapter createPersistenceAdapter() throws IOException {
jdbcPersistenceAdapter.setDataSource(getDataSource());
@@ -110,10 +111,18 @@
this.useJournal = useJournal;
}
+ public boolean isUseDedicatedTaskRunner() {
+ return useDedicatedTaskRunner;
+ }
+
+ public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+ this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+ }
+
public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor
Task", journalThreadPriority,
- true, 1000);
+ true, 1000,
isUseDedicatedTaskRunner());
}
return taskRunnerFactory;
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Fri Nov 13 17:04:09 2009
@@ -43,7 +43,7 @@
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
}
- public TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun) {
+ private TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun) {
this(name,priority,daemon,maxIterationsPerRun,false);
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Fri Nov 13 17:04:09 2009
@@ -333,7 +333,9 @@
if (mcast != null) {
mcast.close();
}
- runner.interrupt();
+ if (runner != null) {
+ runner.interrupt();
+ }
getExecutor().shutdownNow();
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Fri Nov 13 17:04:09 2009
@@ -45,7 +45,8 @@
private static final Object DISCONNECT = new Object();
private static final AtomicLong NEXT_ID = new AtomicLong(0);
- private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new
TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000);
+ // still possible to configure dedicated task runner through system
property but not programmatically
+ private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new
TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
Fri Nov 13 17:04:09 2009
@@ -55,7 +55,7 @@
@SuppressWarnings("serial")
class Trace extends Throwable {
- public int count;
+ public int count = 1;
public final int size;
Trace() {
super();
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Fri Nov 13 17:04:09 2009
@@ -47,7 +47,7 @@
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -105,7 +105,7 @@
final DestinationStatistics destinationStatistics = new
DestinationStatistics();
consumerInfo.setExclusive(true);
final Queue queue = new Queue(brokerService, destination,
- queueMessageStore, destinationStatistics, null);
+ queueMessageStore, destinationStatistics,
brokerService.getTaskRunnerFactory());
// a workaround for this issue
// queue.setUseCache(false);
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Fri Nov 13 17:04:09 2009
@@ -42,15 +42,17 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import junit.framework.TestCase;
+import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.util.ThreadTracker;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,7 +61,7 @@
* @version $Revision: 1.5 $
* A Test case for AMQ-1479
*/
-public class DurableConsumerTest extends TestCase {
+public class DurableConsumerTest extends CombinationTestSupport {
private static final Log LOG =
LogFactory.getLog(DurableConsumerTest.class);
private static int COUNT = 1024*10;
private static String CONSUMER_NAME = "DURABLE_TEST";
@@ -73,8 +75,8 @@
private static final String TOPIC_NAME = "failoverTopic";
private static final String CONNECTION_URL =
"failover:(tcp://localhost:61616,tcp://localhost:61617)";
-
-
+ public boolean useDedicatedTaskRunner = false;
+
private class SimpleTopicSubscriber implements MessageListener,
ExceptionListener {
private TopicConnection topicConnection = null;
@@ -180,8 +182,7 @@
final int id = i;
Thread thread = new Thread( new Runnable() {
public void run() {
-
- SimpleTopicSubscriber sub = new
SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id,
TOPIC_NAME);
+ new SimpleTopicSubscriber(CONNECTION_URL,
System.currentTimeMillis()+"-"+id, TOPIC_NAME);
}
} );
thread.start();
@@ -196,7 +197,13 @@
Thread.sleep(10000);
assertEquals(0, exceptions.size());
}
-
+
+ // makes heavy use of threads and can demonstrate
https://issues.apache.org/activemq/browse/AMQ-2028
+ // with use dedicatedTaskRunner=true and produce OOM
+ public void initCombosForTestConcurrentDurableConsumer() {
+ addCombinationValues("useDedicatedTaskRunner", new Object[]
{Boolean.TRUE, Boolean.FALSE});
+ }
+
public void testConcurrentDurableConsumer() throws Exception {
broker.start();
@@ -251,7 +258,7 @@
}
};
- ExecutorService executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
for (int i=0; i<numConsumers ; i++) {
executor.execute(consumer);
@@ -366,8 +373,7 @@
}
protected void tearDown() throws Exception {
- super.tearDown();
-
+ super.tearDown();
if (broker != null) {
broker.stop();
broker = null;
@@ -396,11 +402,20 @@
answer.setUseShutdownHook(false);
answer.setUseJmx(false);
answer.setAdvisorySupport(false);
+ answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
}
protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
- return new ActiveMQConnectionFactory(bindAddress);
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(bindAddress);
+ factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
+ return factory;
}
-
+ public static Test suite() {
+ return suite(DurableConsumerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=835921&r1=835920&r2=835921&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
Fri Nov 13 17:04:09 2009
@@ -32,7 +32,6 @@
public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
private static final Log LOG =
LogFactory.getLog(DiscoveryTransportNoBrokerTest.class);
-
public void testNoExtraThreads() throws Exception {
BrokerService broker = new BrokerService();
@@ -86,7 +85,7 @@
connection.setClientID("test");
fail("Did not fail to connect as expected.");
}
- catch ( JMSException expected ) {
+ catch ( JMSException expected ) {
assertTrue("reason is java.io.IOException, was: " +
expected.getCause(), expected.getCause() instanceof java.io.IOException);
}
}
@@ -107,10 +106,10 @@
Connection connection = factory.createConnection();
connection.setClientID("test");
fail("Did not fail to connect as expected.");
- } catch ( JMSException expected ) {
+ } catch ( JMSException expected ) {
assertTrue("reason is java.io.IOException, was: " +
expected.getCause(), expected.getCause() instanceof java.io.IOException);
long duration = System.currentTimeMillis() - startT;
- assertTrue("took at least initialReconnectDelay time: " +
duration, duration >= initialReconnectDelay);
+ assertTrue("took at least initialReconnectDelay time: " + duration
+ " e:" + expected, duration >= initialReconnectDelay);
}
}
}