Okay, I've completed the work for thread-management in the freenet
server. This was the org.gamora.ethreads adaptation of the server. I've
licensed it GPL and moved it into the Freenet source tree (though its not
in CVS yet). It gives us the following advantages:
1) Anti-flood - Rejects connections when the maximum thread count is
reached
2) Efficiency management - Not so much with this release, but we can very
easily (one line change) pick different thread-management strategies
for heavily loaded servers
3) Faster connects - The threads are pre-allocated by thread pool, which
can save literally hundreds of instructions on many platforms during
connects.
There is a new configuration directive, 'maxConnectionThreads' which if
defined and not zero enables thread management. If its zero or undefined,
the old behavior is used. Someone want to make sure this works on other
systems? Do we want this in CVS?
Scott
Here is the patch:
- - -
diff -urN Freenet/ConnectionHandler.java Freenet-new/ConnectionHandler.java
--- Freenet/ConnectionHandler.java Fri Apr 14 16:45:50 2000
+++ Freenet-new/ConnectionHandler.java Sun Apr 16 15:58:00 2000
@@ -1,5 +1,6 @@
package Freenet;
import Freenet.support.*;
+import Freenet.thread.*;
import java.net.*;
import java.io.*;
import java.io.EOFException;
@@ -16,7 +17,7 @@
* @author <a href="mailto:blanu at uts.cc.utexas.edu">Brandon Wiley</a>
**/
-public class ConnectionHandler extends Thread
+public class ConnectionHandler implements ERunnable
{
// Protected/Private Fields
private static long ids=0;
@@ -26,6 +27,7 @@
private volatile Long pongs = new Long(0);
private boolean closed;
private boolean waiting = true;
+ private Thread exec_instance;
protected class ConnectionCB implements Callback {
public void callback() {
@@ -43,6 +45,15 @@
}
+ public void start() {
+ exec_instance=new Thread(this);
+ exec_instance.start();
+ }
+
+ public void setExecutionInstance(EThread e) {
+ exec_instance=e.getThread();
+ }
+
// Public Methods
public void run() {
PushbackInputStream in = new PushbackInputStream(c.in);
@@ -173,7 +184,7 @@
**/
public boolean isOpen() {
- return isAlive() && !closed;
+ return exec_instance.isAlive() && !closed;
}
public Address peer() {
diff -urN Freenet/Core.java Freenet-new/Core.java
--- Freenet/Core.java Thu Apr 13 14:02:03 2000
+++ Freenet-new/Core.java Sun Apr 16 16:07:42 2000
@@ -1,6 +1,7 @@
package Freenet;
import Freenet.support.*;
import Freenet.message.HandshakeRequest;
+import Freenet.thread.*;
import java.util.*;
import java.net.*;
import java.io.*;
@@ -31,6 +32,7 @@
public static int handshakeTimeout;
public static int handshakeLife;
public static boolean tunneling;
+ public static int maximumConnectionThreads;
public static Ticker timer;
public ListeningAddress myAddress;
@@ -52,6 +54,14 @@
Connection conn;
RawMessage m;
ConnectionHandler c;
+ ThreadManager threadManager=null;
+
+ if (maximumConnectionThreads>0) {
+ threadManager=new ThreadPool(maximumConnectionThreads,
+ maximumConnectionThreads);
+ threadManager.start();
+ }
+
while(listen)
{
try
@@ -64,7 +74,13 @@
throw new RuntimeException("Problem accepting next connection");
}
c = new ConnectionHandler(conn, mh);
- c.start();
+ if (maximumConnectionThreads>0) {
+ if (!threadManager.run(c)) { //Try to run the thread in the pool
+ Logger.log("Node.java","Rejected connection:"+conn+
+ " (connection limit reached)",Logger.ERROR);
+ c.forceClose();
+ }
+ } else c.start(); // No thread management, start it.
}
}
diff -urN Freenet/node/Node.java Freenet-new/node/Node.java
--- Freenet/node/Node.java Thu Apr 13 14:02:03 2000
+++ Freenet-new/node/Node.java Sun Apr 16 14:21:08 2000
@@ -106,6 +106,8 @@
connectTimeout = params.getint("connectTimeout",30000);
handshakeTimeout = params.getint("handshakeTimeout",30000);
handshakeLife = params.getint("handshakeLife",10000000);
+ maximumConnectionThreads = params.getint("maximumConnectionThreads",0);
+
if (!tunneling)
Logger.log("Node.java","Tunneling turned off",Logger.DEBUGGING);
diff -urN Freenet/scripts/sample.freenetrc Freenet-new/scripts/sample.freenetrc
--- Freenet/scripts/sample.freenetrc Wed Apr 12 10:25:05 2000
+++ Freenet-new/scripts/sample.freenetrc Sun Apr 16 16:17:33 2000
@@ -52,6 +52,10 @@
# How long before a handshake expires (in milliseconds)
handshakeLife=100000
+# Should we use thread-management? If this number is defined and non-zero,
+# this specifies how many inbound connections can be active at once.
+# maximumConnectionThreads=50
+
# The error reporting threshold, one of:
# Error: Errors only
# Normal: Report significant events
diff -urN Freenet/support/BlockingQueue.java
Freenet-new/support/BlockingQueue.java
--- Freenet/support/BlockingQueue.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/support/BlockingQueue.java Sun Apr 16 15:54:27 2000
@@ -0,0 +1,60 @@
+package Freenet.support;
+
+import java.util.*;
+
+/*
+ This code is part of the Java Adaptive Network Client by Ian Clarke.
+ It is distributed under the GNU Public Licence (GPL) version 2. See
+ http://www.gnu.org/ for further details of the GPL.
+*/
+
+/**
+ * Implements a Blocking Queue, that is a LIFO (Last In First Out) buffer
+ * with the property that calls to dequeue() on an empty queue will block
+ * until an element becomes available to dequeue.
+ *
+ * @author Scott G. Miller <scgmille at indiana.edu>
+ */
+public class BlockingQueue {
+ private Vector queue;
+
+ public BlockingQueue() {
+ queue=new Vector();
+ }
+
+ /**
+ * Enqueues an object onto the queue.
+ */
+ public void enqueue(Object o) {
+ synchronized(queue) {
+ queue.addElement(o);
+ queue.notify();
+ }
+ }
+
+ /**
+ * Dequeues an object from the queue. This method will return the
+ * next element in the queue, or block until one is available.
+ */
+ public Object dequeue() throws InterruptedException {
+ synchronized(queue) {
+ if (queue.isEmpty()) {
+ queue.wait();
+ return dequeue();
+ } else {
+ Object tmp=queue.firstElement();
+ queue.removeElementAt(0);
+ return tmp;
+ }
+ }
+ }
+
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ public int size() {
+ return queue.size();
+ }
+}
+
diff -urN Freenet/thread/ERunnable.java Freenet-new/thread/ERunnable.java
--- Freenet/thread/ERunnable.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/thread/ERunnable.java Sun Apr 16 14:41:45 2000
@@ -0,0 +1,23 @@
+package Freenet.thread;
+
+/*
+ This code is part of the Java Adaptive Network Client by Ian Clarke.
+ It is distributed under the GNU Public Licence (GPL) version 2. See
+ http://www.gnu.org/ for further details of the GPL.
+*/
+
+/**
+ * Defines a subclass of Runnable that will be informed of the
+ * EThread that is hosting this Runnable once it is assigned
+ * a thread
+ */
+public interface ERunnable extends Runnable {
+
+ /**
+ * Called by the thread-manager when this ERunnable actually
+ * gets a thread and begins executing. This method will be called
+ * just prior to the run-method of the runnable.
+ */
+ public void setExecutionInstance(EThread e);
+
+}
diff -urN Freenet/thread/EThread.java Freenet-new/thread/EThread.java
--- Freenet/thread/EThread.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/thread/EThread.java Sun Apr 16 16:11:47 2000
@@ -0,0 +1,58 @@
+package Freenet.thread;
+
+/*
+ This code is part of the Java Adaptive Network Client by Ian Clarke.
+ It is distributed under the GNU Public Licence (GPL) version 2. See
+ http://www.gnu.org/ for further details of the GPL.
+*/
+
+/**
+ * This is the atomic element of the e-threads package. An EThread is an
+ * extended thread, it should only be used in conjunction with a ThreadManager.
+ * e-threads cooperate with the ThreadManager to provide additional
+ * helpful behaviors such as thread recycling, pooling, and priority management
+ *
+ * @author Scott G. Miller
+ */
+public class EThread implements Runnable {
+ protected Runnable code;
+ protected ThreadManager host;
+ protected Thread executor;
+
+ public EThread(ThreadPool host) {
+ super();
+ this.host=host;
+ }
+
+ public Thread getThread() {
+ return executor;
+ }
+
+ public ThreadManager getHost() {
+ return host;
+ }
+
+ protected void begin() {
+ executor=new Thread(this);
+ executor.start();
+ }
+
+ protected void setCode(Runnable r) {
+ code=r;
+ }
+
+ public void run() {
+ if (code instanceof ERunnable)
+ ((ERunnable)code).setExecutionInstance(this);
+ try {
+ code.run();
+ } catch (Throwable t) {
+ } finally {
+ host.reclaim(this);
+ }
+ }
+
+ public String toString() {
+ return "EThread["+executor+','+code+','+host+']';
+ }
+}
diff -urN Freenet/thread/Test.java Freenet-new/thread/Test.java
--- Freenet/thread/Test.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/thread/Test.java Sun Apr 16 14:42:38 2000
@@ -0,0 +1,42 @@
+
+
+public class Test implements Runnable {
+
+ static int val=0;
+
+ public static void main(String[] args) throws Exception {
+ Freenet.thread.ThreadManager p=new Freenet.thread.ThreadPool(40);
+ p.start();
+
+ long start=System.currentTimeMillis();
+ for (int x=0; x<10000; x++) {
+ // new Test().run(); //Linear runqueue
+ //new Thread(new Test()).start(); //Uncontrolled spawning
+ p.run(new Test()); //50 count thread pool
+ }
+ System.err.println("Calling flush()...");
+ p.flush();
+ System.err.println("Flush complete");
+ long end=System.currentTimeMillis();
+ System.err.println("Elapsed: "+(end-start));
+ p.halt();
+ System.err.println("Value: "+val);
+ System.err.println("Probably less than 10000, since some threads are
still running");
+ }
+
+ private static int gid=0;
+ private int id;
+
+ public Test() {
+ id=gid++;
+ }
+
+ public void run() {
+ try{ Thread.sleep((int)(Math.random()*50));} catch
(InterruptedException e) {}
+ val++;
+ }
+ public String toString() {
+ return "["+id+"]";
+ }
+
+}
diff -urN Freenet/thread/ThreadManager.java
Freenet-new/thread/ThreadManager.java
--- Freenet/thread/ThreadManager.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/thread/ThreadManager.java Sun Apr 16 14:42:57 2000
@@ -0,0 +1,59 @@
+package Freenet.thread;
+
+/*
+ This code is part of the Java Adaptive Network Client by Ian Clarke.
+ It is distributed under the GNU Public Licence (GPL) version 2. See
+ http://www.gnu.org/ for further details of the GPL.
+*/
+
+/**
+ * Defines the interface for all thread-models in the extended threads
+ * package. Currently, only thread-pool implements this interface
+ *
+ * @author Scott G. Miller
+ */
+public interface ThreadManager {
+
+ /**
+ * Requests that this thread-manager run the given job (a class
+ * implementing the java.lang.Runnable interface). The exact
+ * manner in which the job is run is dependent on the ThreadManger
+ * implementation.
+ *
+ * @arg job The job to run
+ * @return True if the job was accepted, false if the job cannot
+ * be run (due possibly to the thread-manager having been halted)
+ */
+ public boolean run(Runnable job);
+
+ /**
+ * Called by the E-Thread to return itself to the ThreadManager
+ * after a completed job.
+ */
+ public void reclaim(EThread e);
+
+ /**
+ * Begins operation of the thread-manager. Should be called
+ * shortly after instantiation.
+ */
+ public void start();
+
+ /**
+ * Requests that the given thread-manager cease its operations. Any
+ * running threads managed by this ThreadManager will continue
+ * to run until completion, but no new jobs will run. This should
+ * probably be preceded by flush(), otherwise pending jobs may be lost.
+ */
+ public void halt();
+
+ /**
+ * Blocks until all pending jobs have been assigned to threads.
+ * When this method returns, all the jobs that were given to this
+ * thread-manager via the run(Runnable) method have either completed
+ * or are in currently executing threads (i.e. all pending jobs *will*
+ * be run)
+ */
+ public void flush();
+}
+
+
diff -urN Freenet/thread/ThreadPool.java Freenet-new/thread/ThreadPool.java
--- Freenet/thread/ThreadPool.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/thread/ThreadPool.java Sun Apr 16 16:14:06 2000
@@ -0,0 +1,129 @@
+package Freenet.thread;
+
+/*
+ This code is part of the Java Adaptive Network Client by Ian Clarke.
+ It is distributed under the GNU Public Licence (GPL) version 2. See
+ http://www.gnu.org/ for further details of the GPL.
+*/
+
+import java.util.*;
+import Freenet.support.*;
+
+/**
+ * Provides a bounded pool threads and a means to request that jobs
+ * be run on them.
+ *
+ * A Thread pool represents a set of threads that are made available
+ * for processing N jobs simultaneously, where N is the size of the pool.
+ * If more jobs are inserted into the thread pool, they are put on hold
+ * until one of the N running jobs completes, at which point the job
+ * may run.
+ *
+ * In this way one can bound the number of concurrently executing threads
+ * to a managable amount, which improves the efficiency of running jobs
+ * and prevents a system from being brought down with an overload of tasks
+ * (such as in a Denial of Service attack). Unlike unmanaged spawning
+ * of threads (1:1 thread to task), a thread pool will effiently work on
+ * N jobs at once, putting additional requests on hold.
+ *
+ * @author Scott G. Miller
+ */
+public class ThreadPool extends Thread implements ThreadManager {
+ protected int maxThreads, maxJobs;
+ protected BlockingQueue jobs;
+ protected BlockingQueue threads;
+ protected boolean run=true;
+
+ /**
+ * Creates a Thread Pool of with a fixed number of concurrently
+ * executable threads. This also allocates a job queue, so that
+ * new jobs can be inserted in a non-blocking manner even if all
+ * N threads are occupied.
+ *
+ * @param maxThreads The number of threads that can be active at any
+ * one time
+ */
+ public ThreadPool(int maxThreads) {
+ this(maxThreads, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Creates a Thread Pool of with a fixed number of concurrently
+ * executable threads. This also allocates a job queue, so that
+ * new jobs can be inserted in a non-blocking manner even if all
+ * N threads are occupied, but only up to maxJobs may be
+ * running OR enqueued at once.
+ *
+ * @param maxThreads The number of threads that can be active at any
+ * one time
+ * @param maxJobs The maximum number of allowable jobs. run() begins
+ * returning false and turning away jobs at this point
+ */
+ public ThreadPool(int maxThreads, int maxJobs) {
+ jobs=new BlockingQueue();
+ threads=new BlockingQueue();
+ this.maxThreads=maxThreads;
+ this.maxJobs=maxJobs;
+ fillThreadqueue();
+ }
+
+ //(Re)fills the thread-queue with maxThreads threads
+ private void fillThreadqueue() {
+ while (threads.size()<maxThreads) {
+ threads.enqueue(new EThread(this));
+ }
+ }
+
+ /**
+ * Called by the EThread to indicate that it has finished processing
+ * a job and may be reclaimed for further use
+ */
+ public void reclaim(EThread e) {
+ threads.enqueue(e);
+ }
+
+ private int runningThreads() {
+ return maxThreads-threads.size();
+ }
+
+ public boolean run(Runnable r) {
+ if (run) {
+ //
System.err.println("["+runningThreads()+','+jobs.size()+']');
+ if (runningThreads() + jobs.size() < maxJobs) {
+ jobs.enqueue(r);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void flush() {
+ while (!jobs.isEmpty()) {
+ synchronized(jobs) {
+ try {
+ jobs.wait();
+ } catch (InterruptedException ie) {}
+ }
+ }
+ }
+
+ public void halt() {
+ run=false;
+ interrupt();
+ }
+
+ public void run() {
+ while (run) {
+ try {
+ EThread host=(EThread)threads.dequeue();
+ Runnable job=(Runnable)jobs.dequeue();
+ host.setCode(job);
+ host.begin();
+ synchronized(jobs) {
+ jobs.notifyAll();
+ }
+ } catch (InterruptedException ie) {}
+ }
+ }
+}
+
diff -urN Freenet/thread/Wakeable.java Freenet-new/thread/Wakeable.java
--- Freenet/thread/Wakeable.java Wed Dec 31 19:00:00 1969
+++ Freenet-new/thread/Wakeable.java Sun Apr 16 14:44:24 2000
@@ -0,0 +1,18 @@
+package Freenet.thread;
+
+/*
+ This code is part of the Java Adaptive Network Client by Ian Clarke.
+ It is distributed under the GNU Public Licence (GPL) version 2. See
+ http://www.gnu.org/ for further details of the GPL.
+*/
+
+/**
+ * Future thread-managers will be able to wake individual threads when
+ * a waking condition is met. This will eliminate the need for
+ * while (!wakeable) { sleep(50); } type loops
+ */
+public interface Wakeable extends Runnable {
+
+ public abstract boolean wake();
+
+}
_______________________________________________
Freenet-dev mailing list
Freenet-dev at lists.sourceforge.net
http://lists.sourceforge.net/mailman/listinfo/freenet-dev