Author: trustin
Date: Sun Oct 31 00:50:58 2004
New Revision: 56129
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/AbstractThreadPool.java
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/OrderedThreadPool.java
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/RunnableQueue.java
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/SimpleThreadPool.java
Modified:
incubator/directory/seda/trunk/project.xml
incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java
Log:
* Removed dependency to commons-threadpool; now uses home-grown thread pools
for some reason.
* Modified ThreadPool.execute() signature.
InputEvent and OutputEvent must be passed to EncoderManager and OutputManager
in order. I added a 'hint' parameter which is usually 'ClientKey' which will
help OrderedThreadPool which is newly added.
Modified: incubator/directory/seda/trunk/project.xml
==============================================================================
--- incubator/directory/seda/trunk/project.xml (original)
+++ incubator/directory/seda/trunk/project.xml Sun Oct 31 00:50:58 2004
@@ -117,13 +117,7 @@
<!-- Dependencies required for running test cases and examples -->
<!-- ========================================================= -->
- <dependency>
- <groupId>commons-threadpool</groupId>
- <artifactId>commons-threadpool</artifactId>
- <version>1.0-dev</version>
- <url>http://jakarta.apache.org/commons/sandbox/threadpool</url>
- </dependency>
-
+
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
Sun Oct 31 00:50:58 2004
@@ -19,8 +19,6 @@
import java.io.IOException;
-import org.apache.commons.threadpool.CommonsLoggingThreadPoolMonitor;
-import org.apache.commons.threadpool.DefaultThreadPool;
import org.apache.seda.buffer.BufferPool;
import org.apache.seda.buffer.DefaultBufferPool;
import org.apache.seda.buffer.DefaultBufferPoolConfig;
@@ -40,8 +38,15 @@
import org.apache.seda.listener.TCPListenerManager;
import org.apache.seda.output.OutputManager;
import org.apache.seda.output.TCPOutputManager;
-import org.apache.seda.protocol.*;
+import org.apache.seda.protocol.DefaultInetServicesDatabase;
+import org.apache.seda.protocol.DefaultRequestProcessor;
+import org.apache.seda.protocol.InetServiceEntry;
+import org.apache.seda.protocol.InetServicesDatabase;
+import org.apache.seda.protocol.RequestProcessor;
import org.apache.seda.stage.DefaultStageConfig;
+import org.apache.seda.thread.AbstractThreadPool;
+import org.apache.seda.thread.OrderedThreadPool;
+import org.apache.seda.thread.SimpleThreadPool;
import org.apache.seda.thread.ThreadPool;
@@ -110,19 +115,16 @@
}
// no deps
- private ThreadPool createThreadPool(int threads)
+ private ThreadPool createThreadPool(int threads, boolean ordered)
{
- CommonsLoggingThreadPoolMonitor monitor =
- new CommonsLoggingThreadPoolMonitor();
- final DefaultThreadPool ctp = new DefaultThreadPool(monitor, threads);
- ThreadPool pool =
- new ThreadPool()
- {
- public void execute(Runnable runnable)
- {
- ctp.invokeLater(runnable);
- }
- };
+ AbstractThreadPool pool;
+ if (ordered) {
+ pool = new OrderedThreadPool();
+ } else {
+ pool = new SimpleThreadPool();
+ }
+
+ pool.setThreadPoolSize(threads);
return pool;
}
@@ -133,7 +135,7 @@
InetServicesDatabase inetDb)
{
DefaultStageConfig config =
- new DefaultStageConfig("decoderManager", createThreadPool(3));
+ new DefaultStageConfig("decoderManager", createThreadPool(3,
true));
DefaultDecoderManager decMan =
new DefaultDecoderManager(router, config, inetDb);
DecodeStageHandler handler = new DecodeStageHandler(decMan);
@@ -148,7 +150,7 @@
InetServicesDatabase inetDb)
{
DefaultStageConfig config =
- new DefaultStageConfig("encoderManager", createThreadPool(3));
+ new DefaultStageConfig("encoderManager", createThreadPool(3,
false));
DefaultEncoderManager encMan =
new DefaultEncoderManager(router, config, inetDb);
EncodeStageHandler handler = new EncodeStageHandler(encMan);
@@ -176,7 +178,7 @@
private OutputManager createOutputManager(EventRouter router)
{
DefaultStageConfig config =
- new DefaultStageConfig("outputManager", createThreadPool(3));
+ new DefaultStageConfig("outputManager", createThreadPool(3, true));
TCPOutputManager outMan = new TCPOutputManager(router, config);
outMan.start();
return outMan;
@@ -187,7 +189,7 @@
InetServicesDatabase
inetDb)
{
DefaultStageConfig config =
- new DefaultStageConfig("requestProcessor", createThreadPool(3));
+ new DefaultStageConfig("requestProcessor", createThreadPool(3,
false));
DefaultRequestProcessor reqProc =
new DefaultRequestProcessor(router, config, inetDb);
reqProc.start();
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
Sun Oct 31 00:50:58 2004
@@ -18,7 +18,6 @@
package org.apache.seda.decoder;
import java.nio.ByteBuffer;
-
import java.util.EventObject;
import java.util.HashMap;
import java.util.Map;
@@ -27,7 +26,18 @@
import org.apache.commons.codec.stateful.DecoderCallback;
import org.apache.commons.codec.stateful.DecoderFactory;
import org.apache.commons.codec.stateful.StatefulDecoder;
-import org.apache.seda.event.*;
+import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.AddProtocolEvent;
+import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.ConnectSubscriber;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.InputEvent;
+import org.apache.seda.event.InputSubscriber;
+import org.apache.seda.event.ProtocolEvent;
+import org.apache.seda.event.ProtocolSubscriber;
+import org.apache.seda.event.RequestEvent;
import org.apache.seda.listener.ClientKey;
import org.apache.seda.listener.KeyExpiryException;
import org.apache.seda.protocol.InetServicesDatabase;
@@ -76,7 +86,7 @@
this.router = router;
this.inetdb = inetdb;
this.monitor = new DecoderManagerMonitorAdapter();
- super.setMonitor(new LoggingStageMonitor(getClass()));
+ super.setStageMonitor(new LoggingStageMonitor(getClass()));
router.subscribe(InputEvent.class, this);
router.subscribe(ConnectEvent.class, this);
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
Sun Oct 31 00:50:58 2004
@@ -65,12 +65,11 @@
*
* @param router the event router used to publish and subscribe to events
on
*/
- public DefaultEncoderManager(
- EventRouter router, StageConfig config,
+ public DefaultEncoderManager(EventRouter router, StageConfig config,
InetServicesDatabase inetdb)
{
super(config);
- super.setMonitor(new LoggingStageMonitor(this.getClass()));
+ super.setStageMonitor(new LoggingStageMonitor(this.getClass()));
monitor = new EncoderManagerMonitorAdapter();
this.inetdb = inetdb;
this.router = router;
@@ -118,8 +117,7 @@
{
if (event instanceof AddProtocolEvent)
{
- factories.put(
- event.getProtocolProvider().getName(),
+ factories.put(event.getProtocolProvider().getName(),
event.getProtocolProvider().getEncoderFactory());
}
}
@@ -135,7 +133,12 @@
throws KeyExpiryException
{
String proto = inetdb.getProtoByPort(key.getLocalAddress().getPort());
- EncoderFactory factory = (EncoderFactory) factories.get(proto);
+ EncoderFactory factory;
+
+ // FIXME Event synchronization
+ while ((factory = (EncoderFactory) factories.get(proto)) == null)
+ continue;
+
return factory.createEncoder();
}
@@ -166,8 +169,7 @@
*/
encoder.setCallback(new EncoderCallback()
{
- public void encodeOccurred(
- StatefulEncoder encoder,
+ public void encodeOccurred(StatefulEncoder encoder,
Object encoded)
{
ClientKey key = ((ClientEncoder) encoder).getClientKey();
@@ -202,8 +204,7 @@
final Object[] encoded = new Object[1];
encoder.setCallback(new EncoderCallback()
{
- public void encodeOccurred(
- StatefulEncoder encoder, Object obj)
+ public void encodeOccurred(StatefulEncoder encoder, Object obj)
{
encoded[0] = obj;
}
@@ -214,8 +215,8 @@
// the encoded value should be set
if (encoded[0] == null)
{
- throw new EncoderException("Expected a complete encoded object"
- + " but encoder did not produce one");
+ throw new EncoderException("Expected a complete encoded object" +
+ " but encoder did not produce one");
}
return (ByteBuffer) encoded[0];
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
Sun Oct 31 00:50:58 2004
@@ -50,7 +50,11 @@
{
ResponseEvent re = (ResponseEvent) event;
ClientKey key = re.getClientKey();
- StatefulEncoder encoder = encMan.getEncoder(key);
+ StatefulEncoder encoder;
+
+ // FIXME Event synchronization issues
+ while ((encoder = encMan.getEncoder(key)) == null)
+ continue;
try
{
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
Sun Oct 31 00:50:58 2004
@@ -18,10 +18,8 @@
package org.apache.seda.output;
import java.io.IOException;
-
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-
import java.util.EventObject;
import java.util.HashMap;
import java.util.Map;
@@ -81,7 +79,7 @@
this.router.subscribe(ConnectEvent.class, this);
this.router.subscribe(DisconnectEvent.class, this);
config.setHandler(new OutputStageHandler());
- this.setMonitor(new LoggingStageMonitor());
+ this.setStageMonitor(new LoggingStageMonitor());
this.setOutputMonitor(new LoggingOutputMonitor());
}
@@ -213,8 +211,7 @@
}
catch (IOException e)
{
- monitor.failedOnWrite(
- TCPOutputManager.this,
+ monitor.failedOnWrite(TCPOutputManager.this,
event.getClientKey(), e);
}
}
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
Sun Oct 31 00:50:58 2004
@@ -81,7 +81,7 @@
this.router.subscribe(ConnectEvent.class, this);
this.router.subscribe(DisconnectEvent.class, this);
config.setHandler(new OutputStageHandler());
- this.setMonitor(new LoggingStageMonitor());
+ this.setStageMonitor(new LoggingStageMonitor());
this.setOutputMonitor(new LoggingOutputMonitor());
}
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
Sun Oct 31 00:50:58 2004
@@ -56,7 +56,7 @@
DefaultStageConfig defaultConfig = (DefaultStageConfig) config;
defaultConfig.setHandler(new ProcessorStageHandler());
- super.setMonitor(new LoggingStageMonitor(getClass()));
+ super.setStageMonitor(new LoggingStageMonitor(getClass()));
this.inetDb = inetDb;
this.router = router;
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java
Sun Oct 31 00:50:58 2004
@@ -22,6 +22,9 @@
import java.util.LinkedList;
import java.util.Set;
+import org.apache.seda.event.ClientEvent;
+import org.apache.seda.listener.ClientKey;
+
/**
* The default Stage implementation.
@@ -204,7 +207,7 @@
*
* @param monitor the monitor to set for this Stage
*/
- public void setMonitor(StageMonitor monitor)
+ public void setStageMonitor(StageMonitor monitor)
{
this.monitor = monitor;
}
@@ -218,7 +221,7 @@
*
* @author <a href="mailto:[EMAIL PROTECTED]">Alex Karasulu</a>
* @author $Author$
- * @version $Revision$
+ * @version $Revision: 56106 $
*/
class StageDriver implements Runnable
{
@@ -253,7 +256,7 @@
}
catch (Exception e2)
{
- /*NOT THROWN*/
+ /*NOT THROWN*/
}
monitor.driverFailed(DefaultStage.this, e);
@@ -264,8 +267,12 @@
EventObject event = (EventObject) queue.removeLast();
monitor.eventDequeued(DefaultStage.this, event);
+ ClientKey key = null;
+ if (event instanceof ClientEvent) {
+ key = ((ClientEvent) event).getClientKey();
+ }
Runnable l_runnable = new ExecutableHandler(event);
- config.getThreadPool().execute(l_runnable);
+ config.getThreadPool().execute(l_runnable, key);
monitor.eventHandled(DefaultStage.this, event);
}
}
@@ -278,7 +285,7 @@
*
* @author <a href="mailto:[EMAIL PROTECTED]">Alex Karasulu</a>
* @author $Author$
- * @version $Revision$
+ * @version $Revision: 56106 $
*/
class ExecutableHandler implements Runnable
{
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/AbstractThreadPool.java
==============================================================================
--- (empty file)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/AbstractThreadPool.java
Sun Oct 31 00:50:58 2004
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * @(#) $Id: AbstractThreadPooledEventDispatcher.java 169 2004-10-21 07:22:31Z
trustin $
+ */
+package org.apache.seda.thread;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * A base class for thread-pooled stages.
+ *
+ * @author Trustin Lee (http://gleamynode.net/)
+ * @version $Rev: 169 $, $Date: 2004-10-21 16:22:31 +0900 (Thu, 21 Oct 2004) $
+ */
+public abstract class AbstractThreadPool implements ThreadPool
+{
+ private static final Runnable FEWER_THREADS =
+ new Runnable()
+ {
+ public void run()
+ {
+ }
+ };
+
+ private String threadNamePrefix = "stage-";
+ private boolean started;
+ protected final List workers = new ArrayList();
+
+ /** <code>true</code> if the size of this pool can be changed even if it
is already started. */
+ protected final boolean canChangeSizeOnTheFly;
+ private int threadPoolSize = Runtime.getRuntime().availableProcessors();
+ private int threadPriority = Thread.NORM_PRIORITY;
+ private int threadId = 0;
+
+ protected AbstractThreadPool(boolean canChangeSizeOnTheFly)
+ {
+ this.canChangeSizeOnTheFly = canChangeSizeOnTheFly;
+ }
+
+ public synchronized void start()
+ {
+ if (started)
+ {
+ return;
+ }
+
+ started = true;
+ forkThreads(threadPoolSize);
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ forkThreads(-threadPoolSize);
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public int getThreadPoolSize()
+ {
+ return threadPoolSize;
+ }
+
+ public synchronized void setThreadPoolSize(int newSize)
+ {
+ if (newSize <= 0)
+ throw new IllegalArgumentException();
+
+ if (started)
+ {
+ if (canChangeSizeOnTheFly)
+ forkThreads(newSize - threadPoolSize);
+ else
+ throw new IllegalStateException();
+ }
+
+ threadPoolSize = newSize;
+ }
+
+ public abstract void execute(Runnable runnable, Object hint);
+
+ private void forkThreads(int delta)
+ {
+ if (delta == 0)
+ {
+ return;
+ }
+
+ if (delta > 0)
+ {
+ for (; delta > 0; delta--)
+ {
+ workers.add(newWorker());
+ }
+ }
+ else
+ {
+ for (; delta < 0; delta++)
+ {
+ AbstractWorker worker =
+ (AbstractWorker) workers.remove(workers.size() - 1);
+ worker.localEventQueue.push(FEWER_THREADS);
+ }
+ }
+ }
+
+ protected abstract AbstractWorker newWorker();
+
+ public int getThreadPriority()
+ {
+ return threadPriority;
+ }
+
+ public synchronized void setThreadPriority(int newPriority)
+ {
+ if ((newPriority < Thread.MIN_PRIORITY) ||
+ (newPriority > Thread.MAX_PRIORITY))
+ throw new IllegalArgumentException();
+
+ this.threadPriority = newPriority;
+
+ if (isStarted())
+ {
+ Iterator it = workers.iterator();
+
+ while (it.hasNext())
+ {
+ AbstractWorker worker = (AbstractWorker) it.next();
+ worker.setPriority(newPriority);
+ }
+ }
+ }
+
+ public String getThreadNamePrefix()
+ {
+ return threadNamePrefix;
+ }
+
+ public void setThreadNamePrefix(String threadNamePrefix)
+ {
+ if (threadNamePrefix == null)
+ throw new NullPointerException();
+
+ this.threadNamePrefix = threadNamePrefix;
+ }
+
+ protected abstract class AbstractWorker extends Thread
+ {
+ protected final RunnableQueue localEventQueue;
+
+ protected AbstractWorker(RunnableQueue eventQueue)
+ {
+ super(getThreadNamePrefix() + '-' + threadId++);
+
+ setPriority(getThreadPriority());
+ setDaemon(true);
+ this.localEventQueue = eventQueue;
+
+ super.start();
+ }
+
+ public final void run()
+ {
+ Runnable runnable;
+ Object item;
+
+ while (isStarted())
+ {
+ runnable = localEventQueue.fetch();
+
+ if (runnable == FEWER_THREADS)
+ {
+ break;
+ }
+
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
+ }
+ }
+ }
+}
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/OrderedThreadPool.java
==============================================================================
--- (empty file)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/OrderedThreadPool.java
Sun Oct 31 00:50:58 2004
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.seda.thread;
+
+import org.apache.seda.listener.ClientKey;
+
+
+/**
+ * @author Trustin Lee
+ */
+public class OrderedThreadPool extends AbstractThreadPool
+{
+ private int nextWorkerIdx;
+
+ public OrderedThreadPool()
+ {
+ super(false);
+ }
+
+ protected AbstractWorker newWorker()
+ {
+ return new Worker();
+ }
+
+ public void execute(Runnable runnable, Object hint)
+ {
+ start();
+
+ if (hint instanceof ClientKey)
+ {
+ getWorker((ClientKey) hint).localEventQueue.push(runnable);
+ }
+ else
+ {
+ nextWorker().localEventQueue.push(runnable);
+ }
+ }
+
+ private synchronized Worker nextWorker()
+ {
+ int workerIdx = nextWorkerIdx++;
+ nextWorkerIdx %= workers.size();
+ return (Worker) workers.get(workerIdx);
+ }
+
+ private Worker getWorker(ClientKey key)
+ {
+ return (Worker) workers.get(Math.abs(System.identityHashCode(this) ^
+ key.hashCode()) %
this.getThreadPoolSize());
+ }
+
+ private class Worker extends AbstractWorker
+ {
+ public Worker()
+ {
+ super(new RunnableQueue(16));
+ }
+ }
+}
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/RunnableQueue.java
==============================================================================
--- (empty file)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/RunnableQueue.java
Sun Oct 31 00:50:58 2004
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id: EventQueue.java 169 2004-10-21 07:22:31Z trustin $
+ */
+package org.apache.seda.thread;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * A thread-safe event queue.
+ *
+ * @author Trustin Lee (http://gleamynode.net/dev/)
+ * @version $Rev: 169 $, $Date: 2004-10-21 16:22:31 +0900 (Thu, 21 Oct 2004) $
+ */
+class RunnableQueue implements Serializable {
+ private Runnable[] events;
+ private int first = 0;
+ private int last = 0;
+ private int size = 0;
+ private int waitingForNewItem;
+
+ /**
+ * Construct a new, empty <code>Queue</code> with the specified initial
+ * capacity.
+ */
+ public RunnableQueue(int initialCapacity) {
+ events = new Runnable[initialCapacity];
+ }
+
+ /**
+ * Clears this queue.
+ */
+ public synchronized void clear() {
+ Arrays.fill(events, null);
+ first = 0;
+ last = 0;
+ size = 0;
+ }
+
+ /**
+ * Fetches an event entry from this queue.
+ */
+ public synchronized Runnable fetch() {
+ Runnable e;
+ waitingForNewItem++;
+
+ while ((e = fetchNow0()) == null) {
+ try {
+ wait();
+ } catch (InterruptedException ex) {
+ }
+ }
+
+ waitingForNewItem--;
+ return e;
+ }
+
+ public synchronized Runnable fetchNow() {
+ return fetchNow0();
+ }
+
+ private Runnable fetchNow0() {
+ if (size == 0) {
+ return null;
+ }
+
+ Runnable event = events[first];
+ events[first++] = null;
+
+ if (first == events.length) {
+ first = 0;
+ }
+
+ size--;
+ return event;
+ }
+
+ /**
+ * Enqueue into this queue.
+ */
+ public synchronized void push(Runnable event) {
+ if (size == events.length) {
+ // expand queue
+ final int oldLen = events.length;
+ Runnable[] newEvents = new Runnable[oldLen * 2];
+
+ if (first < last) {
+ System.arraycopy(events, first, newEvents, 0, last - first);
+ } else {
+ System.arraycopy(events, first, newEvents, 0, oldLen - first);
+ System.arraycopy(events, 0, newEvents, oldLen - first, last);
+ }
+
+ first = 0;
+ last = oldLen;
+ events = newEvents;
+ }
+
+ events[last++] = event;
+
+ if (last == events.length) {
+ last = 0;
+ }
+
+ size++;
+
+ if (waitingForNewItem > 0) {
+ notify();
+ }
+ }
+
+ /**
+ * Returns <code>true</code> if the queue is empty.
+ */
+ public boolean isEmpty() {
+ return (size == 0);
+ }
+
+ /**
+ * Returns the number of elements in the queue.
+ */
+ public int size() {
+ return size;
+ }
+}
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/SimpleThreadPool.java
==============================================================================
--- (empty file)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/SimpleThreadPool.java
Sun Oct 31 00:50:58 2004
@@ -0,0 +1,37 @@
+/*
+ * Created on 2004. 10. 31
+ *
+ * TODO To change the template for this generated file go to
+ * Window - Preferences - Java - Code Style - Code Templates
+ */
+package org.apache.seda.thread;
+
+
+/**
+ * @author Trustin Lee
+ *
+ * TODO To change the template for this generated type comment go to
+ * Window - Preferences - Java - Code Style - Code Templates
+ */
+public class SimpleThreadPool extends AbstractThreadPool {
+ private final RunnableQueue globalEventQueue = new RunnableQueue(16);
+
+ public SimpleThreadPool() {
+ super(true);
+ }
+
+ protected AbstractWorker newWorker() {
+ return new Worker();
+ }
+
+ public void execute(Runnable event, Object hint) {
+ start();
+ globalEventQueue.push(event);
+ }
+
+ private class Worker extends AbstractWorker {
+ public Worker() {
+ super(globalEventQueue);
+ }
+ }
+}
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java
==============================================================================
---
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java
(original)
+++
incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java
Sun Oct 31 00:50:58 2004
@@ -32,5 +32,5 @@
*
* @param runnable the runnable to execute
*/
- void execute(Runnable runnable);
+ void execute(Runnable runnable, Object hint);
}