Author: chirino
Date: Wed Dec 16 22:21:06 2009
New Revision: 891454
URL: http://svn.apache.org/viewvc?rev=891454&view=rev
Log:
Pushed down optimization in the simple serial queue into the absract serial
queue. Applicable to the advanced impl.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=891454&r1=891453&r2=891454&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
Wed Dec 16 22:21:06 2009
@@ -19,14 +19,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.simple.IntegerCounter;
/**
*
@@ -36,10 +37,15 @@
protected final String label;
protected final AtomicInteger suspendCounter = new AtomicInteger();
+ protected final AtomicInteger executeCounter = new AtomicInteger();
- protected final ConcurrentLinkedQueue<Runnable> runnables = new
ConcurrentLinkedQueue<Runnable>();
+ protected final AtomicLong externalQueueSize = new AtomicLong();
+ protected final AtomicLong queueSize = new AtomicLong();
+ protected final ConcurrentLinkedQueue<Runnable> externalQueue = new
ConcurrentLinkedQueue<Runnable>();
+
+ private final LinkedList<Runnable> localQueue = new LinkedList<Runnable>();
+ private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
- protected final AtomicLong size = new AtomicLong();
protected final Set<DispatchOption> options;
public AbstractSerialDispatchQueue(String label, DispatchOption...options)
{
@@ -60,9 +66,7 @@
public void resume() {
if( suspendCounter.decrementAndGet() == 0 ) {
- if( size.get() != 0 ) {
- dispatchSelfAsync();
- }
+ dispatchSelfAsync();
}
}
@@ -75,16 +79,21 @@
}
public void dispatchAsync(Runnable runnable) {
- if( runnable == null ) {
- throw new IllegalArgumentException();
- }
- long lastSize = size.getAndIncrement();
- if( lastSize==0 ) {
+ assert runnable != null;
+
+ if( queueSize.getAndIncrement()==0 ) {
retain();
}
- runnables.add(runnable);
- if( lastSize == 0 && suspendCounter.get()<=0 ) {
- dispatchSelfAsync();
+
+ // We can take a shortcut...
+ if( executing.get()!=null ) {
+ localQueue.add(runnable);
+ } else {
+ long lastSize = externalQueueSize.getAndIncrement();
+ externalQueue.add(runnable);
+ if( lastSize == 0 && suspendCounter.get()<=0 ) {
+ dispatchSelfAsync();
+ }
}
}
@@ -93,20 +102,68 @@
}
public void run() {
- Runnable runnable;
- long lsize = size.get();
- while( suspendCounter.get() <= 0 && lsize > 0 ) {
- try {
- runnable = runnables.poll();
- if( runnable!=null ) {
- runnable.run();
- lsize = size.decrementAndGet();
- if( lsize==0 ) {
- release();
+ IntegerCounter limit = new IntegerCounter();
+ limit.set(1000);
+ dispatch(limit);
+ }
+
+ protected void dispatch(IntegerCounter limit) {
+ executing.set(true);
+ // Protection against concurrent execution...
+ // Many threads can try to get in.. but only the first will win..
+ if( executeCounter.getAndIncrement()==0 ) {
+ // Do additional loops for each thread that could
+ // not make it in. This protects us from exiting
+ // the dispatch loop but still just after a new
+ // thread was trying to get in.
+ do {
+ dispatchLoop(limit);
+ } while( executeCounter.decrementAndGet()>0 );
+ }
+ executing.remove();
+ }
+
+ private void dispatchLoop(IntegerCounter limit) {
+ int counter=0;
+ try {
+
+ Runnable runnable;
+ while( suspendCounter.get() <= 0 ) {
+
+ if( (runnable = localQueue.poll())!=null ) {
+ counter++;
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ if( limit.decrementAndGet() <= 0 ) {
+ return;
}
+ continue;
}
- } catch (Throwable e) {
- e.printStackTrace();
+
+ long lsize = externalQueueSize.get();
+ if( lsize>0 ) {
+ while( lsize > 0 ) {
+ runnable = externalQueue.poll();
+ if( runnable!=null ) {
+ localQueue.add(runnable);
+ lsize = externalQueueSize.decrementAndGet();
+ }
+ }
+ continue;
+ }
+
+ break;
+ }
+
+ } finally {
+ long size = queueSize.addAndGet(-counter);
+ if( size==0 ) {
+ release();
+ } else {
+ dispatchSelfAsync();
}
}
}
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=891454&r1=891453&r2=891454&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Wed Dec 16 22:21:06 2009
@@ -17,7 +17,6 @@
package org.apache.activemq.dispatch.internal.simple;
-import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.dispatch.DispatchOption;
@@ -30,8 +29,6 @@
private final SimpleDispatcher dispatcher;
private volatile boolean stickToThreadOnNextDispatch;
private volatile boolean stickToThreadOnNextDispatchRequest;
- private final LinkedList<Runnable> localEnqueues = new
LinkedList<Runnable>();
- private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
SerialDispatchQueue(SimpleDispatcher dispatcher, String label,
DispatchOption...options) {
super(label, options);
@@ -68,72 +65,28 @@
}
}
- // We can take a shortcut...
- if( executing.get()!=null ) {
- localEnqueues.add(runnable);
- } else {
- super.dispatchAsync(runnable);
- }
+ super.dispatchAsync(runnable);
}
public void run() {
SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
- if( stickToThreadOnNextDispatch ) {
- stickToThreadOnNextDispatch=false;
- GlobalDispatchQueue global = current.isGlobalDispatchQueue();
- if( global!=null ) {
- setTargetQueue(global.getTargetQueue());
- }
- }
-
- DispatcherThread thread = DispatcherThread.currentDispatcherThread();
-
SimpleDispatcher.CURRENT_QUEUE.set(this);
- executing.set(true);
+
try {
-
- Runnable runnable;
- long lsize = size.get();
- while( suspendCounter.get() <= 0 && lsize > 0 ) {
-
- runnable = runnables.poll();
- if( runnable!=null ) {
- runnable.run();
- lsize = size.decrementAndGet();
- if( lsize==0 ) {
- release();
- }
- if( thread.executionCounter.decrementAndGet() <= 0 ) {
- return;
- }
- }
- }
-
- while( (runnable = localEnqueues.poll())!=null ) {
- runnable.run();
- if( thread.executionCounter.decrementAndGet() <= 0 ) {
- return;
+ if( stickToThreadOnNextDispatch ) {
+ stickToThreadOnNextDispatch=false;
+ GlobalDispatchQueue global = current.isGlobalDispatchQueue();
+ if( global!=null ) {
+ setTargetQueue(global.getTargetQueue());
}
}
+ DispatcherThread thread =
DispatcherThread.currentDispatcherThread();
+ dispatch(thread.executionCounter);
} finally {
- executing.remove();
-
- if( !localEnqueues.isEmpty() ) {
-
- long lastSize = size.getAndAdd(localEnqueues.size());
- if( lastSize==0 ) {
- retain();
- }
- runnables.addAll(localEnqueues);
- localEnqueues.clear();
-
- if( suspendCounter.get()<=0 ) {
- dispatchSelfAsync();
- }
- }
SimpleDispatcher.CURRENT_QUEUE.set(current);
}
+
}
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=891454&r1=891453&r2=891454&view=diff
==============================================================================
---
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
(original)
+++
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Wed Dec 16 22:21:06 2009
@@ -81,10 +81,9 @@
}
}
};
-// for (int i = 0; i < 1000; i++) {
-// queue.dispatchAsync(task);
-// }
- queue.dispatchAsync(task);
+ for (int i = 0; i < 1000; i++) {
+ queue.dispatchAsync(task);
+ }
counter.await();
}