Author: chirino
Date: Tue Dec 15 17:03:19 2009
New Revision: 890887
URL: http://svn.apache.org/viewvc?rev=890887&view=rev
Log:
Optimizing the the simple queue impl
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
Tue Dec 15 17:03:19 2009
@@ -31,7 +31,7 @@
public void dispatchMain();
public DispatchQueue getCurrentQueue();
-
+
public DispatchSource createSource(SelectableChannel channel, int
interestOps, DispatchQueue queue);
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -34,11 +34,13 @@
*/
abstract public class AbstractSerialDispatchQueue extends
AbstractDispatchObject implements DispatchQueue, Runnable {
- private final String label;
- private final AtomicInteger suspendCounter = new AtomicInteger();
- private final ConcurrentLinkedQueue<Runnable> runnables = new
ConcurrentLinkedQueue<Runnable>();
- private final AtomicLong size = new AtomicLong();
- private final Set<DispatchOption> options;
+ protected final String label;
+ protected final AtomicInteger suspendCounter = new AtomicInteger();
+
+ protected final ConcurrentLinkedQueue<Runnable> runnables = new
ConcurrentLinkedQueue<Runnable>();
+
+ protected final AtomicLong size = new AtomicLong();
+ protected final Set<DispatchOption> options;
public AbstractSerialDispatchQueue(String label, DispatchOption...options)
{
this.label = label;
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
Tue Dec 15 17:03:19 2009
@@ -199,6 +199,14 @@
public DispatchQueue getCurrentQueue() {
return CURRENT_QUEUE.get();
+ }
+
+ public DispatchQueue getCurrentThreadQueue() {
+ DispatcherThread thread = DispatcherThread.CURRENT.get();
+ if( thread==null ) {
+ return null;
+ }
+ return thread.currentDispatchQueue;
}
}
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
Tue Dec 15 17:03:19 2009
@@ -33,7 +33,7 @@
static public final ThreadLocal<DispatcherThread> CURRENT = new
ThreadLocal<DispatcherThread>();
- private final ThreadDispatchQueue dispatchQueues[];
+ final ThreadDispatchQueue dispatchQueues[];
static final boolean DEBUG = false;
private Thread thread;
@@ -50,6 +50,8 @@
// Dispatch queue for requests from other threads:
final LinkedNodeList<ForeignEvent>[] foreignQueue = createForeignQueue();
+
+ ThreadDispatchQueue currentDispatchQueue;
private static final int[] TOGGLE = new int[] { 1, 0 };
int foreignToggle = 0;
@@ -168,7 +170,8 @@
// If no local work available wait for foreign work:
while((pdc = priorityQueue.poll())!=null){
if( pdc.priority < dispatchQueues.length ) {
-
AdvancedDispatcher.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
+ currentDispatchQueue = dispatchQueues[pdc.priority];
+
AdvancedDispatcher.CURRENT_QUEUE.set(currentDispatchQueue);
}
if (pdc.tracker != null) {
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -91,7 +91,7 @@
}
public DispatchQueue getTargetQueue() {
- throw new UnsupportedOperationException();
+ return null;
}
public void release() {
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
Tue Dec 15 17:03:19 2009
@@ -24,10 +24,13 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
final public class DispatcherThread extends Thread {
- private static final int MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO =
10000;
+
+ private static final int MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL = 1000;
private final SimpleDispatcher dispatcher;
final ThreadDispatchQueue[] threadQueues;
final AtomicLong threadQueuedRunnables = new AtomicLong();
+ final IntegerCounter executionCounter = new IntegerCounter();
+ ThreadDispatchQueue currentThreadQueue;
public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
this.dispatcher = dispatcher;
@@ -42,63 +45,103 @@
@Override
public void run() {
GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
+ final int PRIORITIES = threadQueues.length;
+ int processGlobalQueueCount = PRIORITIES;
+
try {
- outer: while( true ) {
- int counter=0;
- for (ThreadDispatchQueue queue : threadQueues) {
- SimpleDispatcher.CURRENT_QUEUE.set(queue.globalQueue);
- Runnable runnable;
- while( (runnable = queue.poll())!=null ) {
- dispatch(runnable);
- counter++;
+ start: for(;;) {
+
+
executionCounter.set(MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL);
+
+ // Process the local non-synchronized queues.
+ // least contention
+ outer: while( executionCounter.get() > 0 ) {
+ processGlobalQueueCount=PRIORITIES;
+ for (int i=0; i < PRIORITIES; i++) {
+ currentThreadQueue = threadQueues[i];
+ Runnable runnable = currentThreadQueue.pollLocal();
+ if( runnable==null ) {
+ continue;
+ }
+
+
SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
+ processGlobalQueueCount=i;
+ for(;;) {
+ dispatch(runnable);
+ if( executionCounter.decrementAndGet() <= 0 ) {
+ break outer;
+ }
+ runnable = currentThreadQueue.pollLocal();
+ if( runnable == null ) {
+ break;
+ }
+ }
+ }
+
+ // There was no work to do in the local queues..
+ if( processGlobalQueueCount == PRIORITIES) {
+ break;
}
- }
- if( counter!=0 ) {
- // don't service the global queues until the thread queues
are
- // drained.
- continue;
}
- for (SimpleQueue queue : globalQueues) {
- SimpleDispatcher.CURRENT_QUEUE.set(queue);
-
- Runnable runnable;
- while( (runnable = queue.poll())!=null ) {
- dispatch(runnable);
- counter++;
-
- // Thread queues have the priority.
- if( threadQueuedRunnables.get()!=0 ) {
- continue outer;
+ // Process the local synchronized queues.
+ // medium contention
+ outer: while( executionCounter.get() > 0 ) {
+ processGlobalQueueCount=PRIORITIES;
+ for (int i=0; i < PRIORITIES; i++) {
+ currentThreadQueue = threadQueues[i];
+ Runnable runnable = currentThreadQueue.pollShared();
+ if( runnable==null ) {
+ continue;
+ }
+
SimpleDispatcher.CURRENT_QUEUE.set(currentThreadQueue.globalQueue);
+ processGlobalQueueCount=i;
+ for(;;) {
+ dispatch(runnable);
+ if( executionCounter.decrementAndGet() <= 0 ) {
+ break outer;
+ }
+ runnable = currentThreadQueue.pollShared();
+ if( runnable == null ) {
+ break;
+ }
}
}
+
+ // There was no work to do in the local queues..
+ if( processGlobalQueueCount == PRIORITIES) {
+ break;
+ }
+ }
+
+ // Process the global synchronized queues.
+ // most contention
+ for (int i=0; i < processGlobalQueueCount; i++) {
+ currentThreadQueue = threadQueues[i];
+ GlobalDispatchQueue queue = globalQueues[i];
+ Runnable runnable = queue.poll();
+ if( runnable==null ) {
+ continue;
+ }
+ // We only execute 1 global runnable at a time,
+ // hoping it generates local work for us.
+ SimpleDispatcher.CURRENT_QUEUE.set(queue);
+ dispatch(runnable);
+ continue start;
+ }
+
+ if(
executionCounter.get()!=MAX_LOCAL_DISPATCH_BEFORE_CHECKING_GLOBAL ) {
+ continue start;
}
- if( counter!=0 ) {
- // don't wait for wake up until we could find
- // no runnables to dispatch.
- continue;
- }
-
-// GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
-// while( true ) {
-//
-// if( dispatch(threadQueues[0])
-// || dispatch(globalQueues[0])
-// || dispatch(threadQueues[1])
-// || dispatch(globalQueues[1])
-// || dispatch(threadQueues[2])
-// || dispatch(globalQueues[2])
-// ) {
-// continue;
-// }
-//
+
+ // If we get here then there was no work in the global queues..
try {
waitForWakeup();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
- }
+ }
} catch (Shutdown e) {
}
}
@@ -107,23 +150,6 @@
static class Shutdown extends RuntimeException {
}
- private boolean dispatch(SimpleQueue queue) {
- int counter=0;
- Runnable runnable;
- while( counter < MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO ) {
- runnable = queue.poll();
- if( runnable == null ) {
- break;
- }
- if( counter==0 ) {
- SimpleDispatcher.CURRENT_QUEUE.set(queue);
- }
- dispatch(runnable);
- counter++;
- }
- return counter!=0;
- }
-
private void dispatch(Runnable runnable) {
try {
runnable.run();
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -17,15 +17,14 @@
package org.apache.activemq.dispatch.internal.simple;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.dispatch.DispatchOption;
-import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.internal.QueueSupport;
/**
@@ -56,9 +55,14 @@
}
public void dispatchAsync(Runnable runnable) {
- this.counter.incrementAndGet();
- runnables.add(runnable);
- dispatcher.wakeup();
+ DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+ if( thread==null ) {
+ this.counter.incrementAndGet();
+ runnables.add(runnable);
+ dispatcher.wakeup();
+ } else {
+ thread.currentThreadQueue.dispatchAsync(runnable);
+ }
}
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java?rev=890887&view=auto
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
(added)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
Tue Dec 15 17:03:19 2009
@@ -0,0 +1,81 @@
+package org.apache.activemq.dispatch.internal.simple;
+
+
+public class IntegerCounter {
+
+ int counter;
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ IntegerCounter other = (IntegerCounter) obj;
+ if (counter != other.counter)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + counter;
+ return result;
+ }
+
+ public final int addAndGet(int delta) {
+ counter+=delta;
+ return counter;
+ }
+
+ public final int decrementAndGet() {
+ return --counter;
+ }
+
+ public final int get() {
+ return counter;
+ }
+
+ public final int getAndAdd(int delta) {
+ int rc = counter;
+ counter += delta;
+ return rc;
+ }
+
+ public final int getAndDecrement() {
+ int rc = counter;
+ counter --;
+ return rc;
+ }
+
+ public final int getAndIncrement() {
+ return counter++;
+ }
+
+ public final int getAndSet(int newValue) {
+ int rc = counter;
+ counter = newValue;
+ return rc;
+ }
+
+ public final int incrementAndGet() {
+ return ++counter;
+ }
+
+ public int intValue() {
+ return counter;
+ }
+
+ public final void set(int newValue) {
+ counter = newValue;
+ }
+
+ public String toString() {
+ return Integer.toString(counter);
+ }
+
+}
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.dispatch.internal.simple;
+import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.dispatch.DispatchOption;
@@ -29,10 +30,15 @@
private final SimpleDispatcher dispatcher;
private volatile boolean stickToThreadOnNextDispatch;
private volatile boolean stickToThreadOnNextDispatchRequest;
+ private final LinkedList<Runnable> localEnqueues = new
LinkedList<Runnable>();
+ private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
SerialDispatchQueue(SimpleDispatcher dispatcher, String label,
DispatchOption...options) {
super(label, options);
this.dispatcher = dispatcher;
+ if( getOptions().contains(DispatchOption.STICK_TO_DISPATCH_THREAD) ) {
+ stickToThreadOnNextDispatch=true;
+ }
}
@Override
@@ -45,12 +51,8 @@
}
@Override
- public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
- dispatcher.timerThread.addRelative(runnable, this, delay, unit);
- }
-
- @Override
- protected void dispatchSelfAsync() {
+ public void dispatchAsync(Runnable runnable) {
+
if( stickToThreadOnNextDispatchRequest ) {
SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
if( current!=null ) {
@@ -65,10 +67,15 @@
stickToThreadOnNextDispatchRequest=false;
}
}
- super.dispatchSelfAsync();
+
+ // We can take a shortcut...
+ if( executing.get()!=null ) {
+ localEnqueues.add(runnable);
+ } else {
+ super.dispatchAsync(runnable);
+ }
}
- @Override
public void run() {
SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
if( stickToThreadOnNextDispatch ) {
@@ -78,14 +85,62 @@
setTargetQueue(global.getTargetQueue());
}
}
+
+ DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+
SimpleDispatcher.CURRENT_QUEUE.set(this);
+ executing.set(true);
try {
- super.run();
+
+ Runnable runnable;
+ long lsize = size.get();
+ while( suspendCounter.get() <= 0 && lsize > 0 ) {
+
+ runnable = runnables.poll();
+ if( runnable!=null ) {
+ runnable.run();
+ lsize = size.decrementAndGet();
+ if( lsize==0 ) {
+ release();
+ }
+ if( thread.executionCounter.decrementAndGet() <= 0 ) {
+ return;
+ }
+ }
+ }
+
+ while( (runnable = localEnqueues.poll())!=null ) {
+ runnable.run();
+ if( thread.executionCounter.decrementAndGet() <= 0 ) {
+ return;
+ }
+ }
+
} finally {
+ executing.remove();
+
+ if( !localEnqueues.isEmpty() ) {
+
+ long lastSize = size.getAndAdd(localEnqueues.size());
+ if( lastSize==0 ) {
+ retain();
+ }
+ runnables.addAll(localEnqueues);
+ localEnqueues.clear();
+
+ if( suspendCounter.get()<=0 ) {
+ dispatchSelfAsync();
+ }
+ }
SimpleDispatcher.CURRENT_QUEUE.set(current);
}
}
+ @Override
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ dispatcher.timerThread.addRelative(runnable, this, delay, unit);
+ }
+
public DispatchPriority getPriority() {
throw new UnsupportedOperationException();
}
@@ -109,4 +164,5 @@
public SimpleQueue getTargetQueue() {
return (SimpleQueue) targetQueue;
}
+
}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Tue Dec 15 17:03:19 2009
@@ -116,7 +116,7 @@
public void shutdown() {
Runnable countDown = new Runnable() {
- AtomicInteger shutdownCountDown = new
AtomicInteger(dispatchers.length+1);
+ AtomicInteger shutdownCountDown = new
AtomicInteger(dispatchers.length);
public void run() {
if( shutdownCountDown.decrementAndGet()==0 ) {
// Notify any registered shutdown watchers.
@@ -126,10 +126,10 @@
}
};
- timerThread.shutdown(countDown);
+ timerThread.shutdown(null);
for (int i = 0; i < dispatchers.length; i++) {
ThreadDispatchQueue queue =
dispatchers[i].threadQueues[LOW.ordinal()];
- queue.runnables.add(countDown);
+ queue.dispatchAsync(countDown);
}
}
@@ -141,4 +141,12 @@
return CURRENT_QUEUE.get();
}
+ public DispatchQueue getCurrentThreadQueue() {
+ DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+ if( thread == null ) {
+ return null;
+ }
+ return thread.currentThreadQueue;
+ }
+
}
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
Tue Dec 15 17:03:19 2009
@@ -5,7 +5,6 @@
public interface SimpleQueue extends DispatchQueue {
- Runnable poll();
DispatchPriority getPriority();
SerialDispatchQueue isSerialDispatchQueue();
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
Tue Dec 15 17:03:19 2009
@@ -69,24 +69,18 @@
}
}
- public Runnable poll() {
-
- // This method should only be called by our dispatcher
- // thread.
- assert Thread.currentThread()==dispatcher;
-
- Runnable rc = localRunnables.poll();
- if( rc !=null ) {
- return rc;
- }
-
- rc = runnables.poll();
+ public Runnable pollShared() {
+ Runnable rc = runnables.poll();
if( rc !=null ) {
counter.decrementAndGet();
}
return rc;
}
+ public Runnable pollLocal() {
+ return localRunnables.poll();
+ }
+
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
throw new RuntimeException("TODO: implement me.");
}
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=890887&r1=890886&r2=890887&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Tue Dec 15 17:03:19 2009
@@ -22,8 +22,6 @@
import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
import org.apache.activemq.dispatch.internal.simple.SimpleDispatcher;
-import static org.apache.activemq.dispatch.DispatchPriority.*;
-
import static java.lang.String.*;
/**
@@ -35,33 +33,35 @@
public static void main(String[] args) throws Exception {
Dispatcher advancedSystem = new AdvancedDispatcher(new
DispatcherConfig());
advancedSystem.retain();
- benchmark("advanced global queue", advancedSystem,
advancedSystem.getGlobalQueue(DEFAULT));
- benchmark("advanced private serial queue", advancedSystem,
advancedSystem.createSerialQueue("test",
DispatchOption.STICK_TO_CALLER_THREAD));
+ benchmarkGlobal("advanced global queue", advancedSystem);
+ benchmarkSerial("advanced private serial queue", advancedSystem);
RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
advancedSystem.addShutdownWatcher(latch);
advancedSystem.release();
latch.await();
- Dispatcher simpleSystem = new SimpleDispatcher(new DispatcherConfig());
+ DispatcherConfig config = new DispatcherConfig();
+ config.setThreads(6);
+ Dispatcher simpleSystem = new SimpleDispatcher(config);
simpleSystem.retain();
- benchmark("simple global queue", simpleSystem,
simpleSystem.getGlobalQueue(DEFAULT));
- benchmark("simple private serial queue", simpleSystem,
simpleSystem.createSerialQueue("test", DispatchOption.STICK_TO_CALLER_THREAD));
+ benchmarkGlobal("simple global queue", simpleSystem);
+ benchmarkSerial("simple private serial queue", simpleSystem);
latch = new RunnableCountDownLatch(1);
- advancedSystem.addShutdownWatcher(latch);
- advancedSystem.release();
+ simpleSystem.addShutdownWatcher(latch);
+ simpleSystem.release();
latch.await();
}
- private static void benchmark(String name, Dispatcher dispatcher,
DispatchQueue queue) throws InterruptedException {
+ private static void benchmarkSerial(String name, Dispatcher dispatcher)
throws InterruptedException {
// warm the JIT up..
- benchmarkWork(dispatcher, queue, 100000);
+ benchmarkSerialWork(dispatcher, 100000);
int iterations = 1000*1000*20;
long start = System.nanoTime();
- benchmarkWork(dispatcher, queue, iterations);
+ benchmarkSerialWork(dispatcher, iterations);
long end = System.nanoTime();
double durationMS = 1.0d*(end-start)/1000000d;
@@ -70,13 +70,14 @@
System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f
executions/sec", name, durationMS, rate));
}
- private static void benchmarkWork(final Dispatcher dispatcher, final
DispatchQueue queue, int iterations) throws InterruptedException {
+ private static void benchmarkSerialWork(final Dispatcher dispatcher, int
iterations) throws InterruptedException {
+ final DispatchQueue queue = dispatcher.createSerialQueue(null,
DispatchOption.STICK_TO_CALLER_THREAD);
final CountDownLatch counter = new CountDownLatch(iterations);
Runnable task = new Runnable(){
public void run() {
counter.countDown();
if( counter.getCount()>0 ) {
- dispatcher.getCurrentQueue().dispatchAsync(this);
+ queue.dispatchAsync(this);
}
}
};
@@ -85,4 +86,51 @@
}
counter.await();
}
+
+ private static void benchmarkGlobal(String name, Dispatcher dispatcher)
throws InterruptedException {
+ // warm the JIT up..
+ benchmarkGlobalWork(dispatcher, 100000);
+
+ int iterations = 1000*1000*20;
+ long start = System.nanoTime();
+ benchmarkGlobalWork(dispatcher, iterations);
+ long end = System.nanoTime();
+
+ double durationMS = 1.0d*(end-start)/1000000d;
+ double rate = 1000d * iterations / durationMS;
+
+ System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f
executions/sec", name, durationMS, rate));
+ }
+
+
+ private static final class TestRunnable implements Runnable {
+ private final int counter;
+ private final Runnable onDone;
+ private final DispatchQueue queue;
+
+ private TestRunnable(int counter, DispatchQueue queue, Runnable
onDone) {
+ this.counter=counter;
+ this.onDone = onDone;
+ this.queue = queue;
+ }
+
+ public void run() {
+ if( counter ==0 ) {
+ onDone.run();
+ } else {
+ queue.dispatchAsync(new TestRunnable(counter-1, queue,
onDone));
+ }
+ }
+ }
+
+ private static void benchmarkGlobalWork(final Dispatcher dispatcher, int
iterations) throws InterruptedException {
+ final DispatchQueue queue = dispatcher.getGlobalQueue();
+ int PARTITIONS = 1000;
+ RunnableCountDownLatch counter = new
RunnableCountDownLatch(PARTITIONS);
+ for (int i = 0; i < PARTITIONS; i++) {
+ queue.dispatchAsync(new TestRunnable(iterations/PARTITIONS, queue,
counter));
+ }
+ counter.await();
+ }
+
}