Author: cmacnaug
Date: Wed Dec 16 22:09:53 2009
New Revision: 891451
URL: http://svn.apache.org/viewvc?rev=891451&view=rev
Log:
Fixing a synchronization hole that can result in missed dispatcher wakeups.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=891451&r1=891450&r2=891451&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
Wed Dec 16 22:09:53 2009
@@ -17,6 +17,8 @@
package org.apache.activemq.dispatch.internal.simple;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -24,103 +26,103 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
final public class DispatcherThread extends Thread {
-
+
private static final int MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL = 1000;
private final SimpleDispatcher dispatcher;
final ThreadDispatchQueue[] threadQueues;
final AtomicLong threadQueuedRunnables = new AtomicLong();
final IntegerCounter executionCounter = new IntegerCounter();
ThreadDispatchQueue currentThreadQueue;
-
+
public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
this.dispatcher = dispatcher;
this.threadQueues = new
ThreadDispatchQueue[dispatcher.globalQueues.length];
for (int i = 0; i < threadQueues.length; i++) {
threadQueues[i] = new ThreadDispatchQueue(this,
dispatcher.globalQueues[i]);
}
- setName(dispatcher.getLabel()+" dispatcher: "+(ordinal+1));
+ setName(dispatcher.getLabel() + " dispatcher: " + (ordinal + 1));
setDaemon(true);
}
-
+
@Override
public void run() {
GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
final int PRIORITIES = threadQueues.length;
int processGlobalQueueCount = PRIORITIES;
-
+
try {
- start: for(;;) {
-
+ start: for (;;) {
+
executionCounter.set(MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL);
-
+
// Process the local non-synchronized queues.
// least contention
- outer: while( executionCounter.get() > 0 ) {
- processGlobalQueueCount=PRIORITIES;
- for (int i=0; i < PRIORITIES; i++) {
+ outer: while (executionCounter.get() > 0) {
+ processGlobalQueueCount = PRIORITIES;
+ for (int i = 0; i < PRIORITIES; i++) {
currentThreadQueue = threadQueues[i];
Runnable runnable = currentThreadQueue.pollLocal();
- if( runnable==null ) {
+ if (runnable == null) {
continue;
}
-
+
SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
- processGlobalQueueCount=i;
- for(;;) {
+ processGlobalQueueCount = i;
+ for (;;) {
dispatch(runnable);
- if( executionCounter.decrementAndGet() <= 0 ) {
+ if (executionCounter.decrementAndGet() <= 0) {
break outer;
}
runnable = currentThreadQueue.pollLocal();
- if( runnable == null ) {
+ if (runnable == null) {
break;
}
}
}
-
+
// There was no work to do in the local queues..
- if( processGlobalQueueCount == PRIORITIES) {
+ if (processGlobalQueueCount == PRIORITIES) {
break;
}
}
-
- // Process the local synchronized queues.
+
+ // Process the local synchronized queues.
// medium contention
- outer: while( executionCounter.get() > 0 ) {
- processGlobalQueueCount=PRIORITIES;
- for (int i=0; i < PRIORITIES; i++) {
+ outer: while (executionCounter.get() > 0) {
+ processGlobalQueueCount = PRIORITIES;
+ for (int i = 0; i < PRIORITIES; i++) {
currentThreadQueue = threadQueues[i];
Runnable runnable = currentThreadQueue.pollShared();
- if( runnable==null ) {
+ if (runnable == null) {
continue;
}
SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
- processGlobalQueueCount=i;
- for(;;) {
+ processGlobalQueueCount = i;
+ for (;;) {
dispatch(runnable);
- if( executionCounter.decrementAndGet() <= 0 ) {
+ if (executionCounter.decrementAndGet() <= 0) {
break outer;
}
runnable = currentThreadQueue.pollShared();
- if( runnable == null ) {
+ if (runnable == null) {
break;
}
}
}
-
+
// There was no work to do in the local queues..
- if( processGlobalQueueCount == PRIORITIES) {
+ if (processGlobalQueueCount == PRIORITIES) {
break;
}
}
-
- // Process the global synchronized queues.
+
+ // Process the global synchronized queues.
// most contention
- for (int i=0; i < processGlobalQueueCount; i++) {
+ for (int i = 0; i < processGlobalQueueCount; i++) {
currentThreadQueue = threadQueues[i];
GlobalDispatchQueue queue = globalQueues[i];
Runnable runnable = queue.poll();
- if( runnable==null ) {
+ if (runnable == null) {
continue;
}
// We only execute 1 global runnable at a time,
@@ -129,11 +131,11 @@
dispatch(runnable);
continue start;
}
-
- if(
executionCounter.get()!=MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL ) {
+
+ if (executionCounter.get() !=
MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL) {
continue start;
}
-
+
// If we get here then there was no work in the global queues..
try {
waitForWakeup();
@@ -141,11 +143,11 @@
e.printStackTrace();
return;
}
- }
+ }
} catch (Shutdown e) {
}
}
-
+
@SuppressWarnings("serial")
static class Shutdown extends RuntimeException {
}
@@ -159,41 +161,39 @@
e.printStackTrace();
}
}
-
+
public static DispatcherThread currentDispatcherThread() {
Thread currentThread = Thread.currentThread();
- if( currentThread.getClass() == DispatcherThread.class ) {
+ if (currentThread.getClass() == DispatcherThread.class) {
return (DispatcherThread) currentThread;
}
return null;
}
- private final Object wakeupMutex = new Object();
- private boolean inWaitingList;
-
+ private final Semaphore wakeups = new Semaphore(0);
+ private final AtomicBoolean inWaitingList = new AtomicBoolean(false);
+
private void waitForWakeup() throws InterruptedException {
- while( threadQueuedRunnables.get()==0 &&
dispatcher.globalQueuedRunnables.get()==0 ) {
- synchronized(wakeupMutex) {
- if( !inWaitingList ) {
- dispatcher.addWaitingDispatcher(this);
- inWaitingList=true;
+ while (threadQueuedRunnables.get() == 0 &&
dispatcher.globalQueuedRunnables.get() == 0) {
+ if (!wakeups.tryAcquire()) {
+ if (inWaitingList.compareAndSet(false, true)) {
+ if (!dispatcher.addWaitingDispatcher(this)) {
+ inWaitingList.set(false);
+ }
}
- wakeupMutex.wait();
}
+ wakeups.acquire();
}
+ wakeups.drainPermits();
}
public void globalWakeup() {
- synchronized(wakeupMutex) {
- inWaitingList=false;
- wakeupMutex.notify();
- }
+ wakeups.release();
+ inWaitingList.set(false);
}
-
+
public void wakeup() {
- synchronized(wakeupMutex) {
- wakeupMutex.notify();
- }
+ wakeups.release();
}
-
+
}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=891451&r1=891450&r2=891451&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Wed Dec 16 22:09:53 2009
@@ -32,32 +32,30 @@
import static org.apache.activemq.dispatch.DispatchPriority.*;
-
-
/**
* Implements a simple dispatch system.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
final public class SimpleDispatcher extends BaseRetained implements Dispatcher
{
-
+
public final static ThreadLocal<SimpleQueue> CURRENT_QUEUE = new
ThreadLocal<SimpleQueue>();
final SerialDispatchQueue mainQueue = new SerialDispatchQueue(this,
"main");
- final GlobalDispatchQueue globalQueues[];
+ final GlobalDispatchQueue globalQueues[];
final DispatcherThread dispatchers[];
final AtomicLong globalQueuedRunnables = new AtomicLong();
-
+
final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new
ConcurrentLinkedQueue<DispatcherThread>();
final AtomicInteger waitingDispatcherCount = new AtomicInteger();
private final String label;
TimerThread timerThread;
-
+
public SimpleDispatcher(DispatcherConfig config) {
this.label = config.getLabel();
globalQueues = new GlobalDispatchQueue[3];
for (int i = 0; i < 3; i++) {
- globalQueues[i] = new GlobalDispatchQueue(this,
DispatchPriority.values()[i] );
+ globalQueues[i] = new GlobalDispatchQueue(this,
DispatchPriority.values()[i]);
}
dispatchers = new DispatcherThread[config.getThreads()];
}
@@ -65,7 +63,7 @@
public DispatchQueue getMainQueue() {
return mainQueue;
}
-
+
public DispatchQueue getGlobalQueue() {
return getGlobalQueue(DEFAULT);
}
@@ -73,13 +71,13 @@
public DispatchQueue getGlobalQueue(DispatchPriority priority) {
return globalQueues[priority.ordinal()];
}
-
+
public DispatchQueue createSerialQueue(String label, DispatchOption...
options) {
AbstractSerialDispatchQueue rc = new SerialDispatchQueue(this, label,
options);
rc.setTargetQueue(getGlobalQueue());
return rc;
}
-
+
public void dispatchMain() {
mainQueue.run();
}
@@ -88,16 +86,22 @@
return null;
}
- public void addWaitingDispatcher(DispatcherThread dispatcher) {
- waitingDispatcherCount.incrementAndGet();
- waitingDispatchers.add(dispatcher);
+ public boolean addWaitingDispatcher(DispatcherThread dispatcher) {
+ if (globalQueuedRunnables.get() <= 0) {
+ waitingDispatcherCount.incrementAndGet();
+ waitingDispatchers.add(dispatcher);
+ return true;
+ } else {
+ dispatcher.globalWakeup();
+ return false;
+ }
}
-
+
public void wakeup() {
int value = waitingDispatcherCount.get();
- if( value!=0 ) {
+ if (value != 0) {
DispatcherThread dispatcher = waitingDispatchers.poll();
- if( dispatcher!=null ) {
+ if (dispatcher != null) {
waitingDispatcherCount.decrementAndGet();
dispatcher.globalWakeup();
}
@@ -114,11 +118,12 @@
}
public void shutdown() {
-
+
Runnable countDown = new Runnable() {
AtomicInteger shutdownCountDown = new
AtomicInteger(dispatchers.length);
+
public void run() {
- if( shutdownCountDown.decrementAndGet()==0 ) {
+ if (shutdownCountDown.decrementAndGet() == 0) {
// Notify any registered shutdown watchers.
SimpleDispatcher.super.shutdown();
}
@@ -140,10 +145,10 @@
public DispatchQueue getCurrentQueue() {
return CURRENT_QUEUE.get();
}
-
+
public DispatchQueue getCurrentThreadQueue() {
DispatcherThread thread = DispatcherThread.currentDispatcherThread();
- if( thread == null ) {
+ if (thread == null) {
return null;
}
return thread.currentThreadQueue;