Author: toad
Date: 2006-01-25 14:55:11 +0000 (Wed, 25 Jan 2006)
New Revision: 7919
Modified:
branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
branches/async-client/src/freenet/node/Node.java
branches/async-client/src/freenet/node/RequestStarter.java
branches/async-client/src/freenet/support/RandomGrabArray.java
branches/async-client/src/freenet/support/RandomGrabArrayWithInt.java
branches/async-client/src/freenet/support/SimpleIntNumberedItemComparator.java
branches/async-client/src/freenet/support/SortedVectorByNumber.java
Log:
works for sub-1-block uncompressed CHK insert/retrieves.
Modified:
branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
2006-01-25 14:18:06 UTC (rev 7918)
+++ branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
2006-01-25 14:55:11 UTC (rev 7919)
@@ -2,6 +2,7 @@
import freenet.crypt.RandomSource;
import freenet.node.RequestStarter;
+import freenet.support.Logger;
import freenet.support.RandomGrabArrayWithInt;
import freenet.support.SortedVectorByNumber;
@@ -31,17 +32,17 @@
this.random = random;
this.isInsertScheduler = forInserts;
priorities = new
SortedVectorByNumber[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
- for(int i=0;i<priorities.length;i++)
- priorities[i] = new SortedVectorByNumber();
}
public void register(SendableRequest req) {
+ Logger.minor(this, "Registering "+req);
synchronized(this) {
if((!isInsertScheduler) && req instanceof ClientPutter)
throw new IllegalArgumentException("Expected a
ClientPut: "+req);
RandomGrabArrayWithInt grabber =
makeGrabArray(req.getPriorityClass(),
req.getRetryCount());
grabber.add(req);
+ Logger.minor(this, "Registered "+req+" on
prioclass="+req.getPriorityClass()+", retrycount="+req.getRetryCount());
}
synchronized(starter) {
starter.notifyAll();
@@ -56,8 +57,9 @@
}
RandomGrabArrayWithInt grabber = (RandomGrabArrayWithInt)
prio.get(retryCount);
if(grabber == null) {
- grabber = new RandomGrabArrayWithInt(retryCount);
+ grabber = new RandomGrabArrayWithInt(random,
retryCount);
prio.add(grabber);
+ Logger.minor(this, "Registering retry count
"+retryCount+" with prioclass "+priorityClass);
}
return grabber;
}
@@ -89,8 +91,16 @@
// Priorities start at 0
for(int i=0;i<RequestStarter.MINIMUM_PRIORITY_CLASS;i++) {
SortedVectorByNumber s = priorities[i];
- if(s == null) continue;
+ if(s == null) {
+ Logger.minor(this, "Priority "+i+" is null");
+ continue;
+ }
RandomGrabArrayWithInt rga = (RandomGrabArrayWithInt)
s.getFirst(); // will discard finished items
+ if(rga == null) {
+ Logger.minor(this, "No retrycount's in priority
"+i);
+ priorities[i] = null;
+ continue;
+ }
SendableRequest req = (SendableRequest)
rga.removeRandom();
if(rga.isEmpty()) {
s.remove(rga.getNumber());
@@ -98,8 +108,14 @@
priorities[i] = null;
}
}
+ if(req == null) {
+ Logger.minor(this, "No requests in priority
"+i+", retrycount "+rga.getNumber());
+ continue;
+ }
+ Logger.minor(this, "removeFirst() returning "+req);
return req;
}
+ Logger.minor(this, "No requests to run");
return null;
}
}
Modified: branches/async-client/src/freenet/node/Node.java
===================================================================
--- branches/async-client/src/freenet/node/Node.java 2006-01-25 14:18:06 UTC
(rev 7918)
+++ branches/async-client/src/freenet/node/Node.java 2006-01-25 14:55:11 UTC
(rev 7919)
@@ -455,12 +455,14 @@
requestThrottle = new RequestThrottle(5000, 2.0F);
requestStarter = new RequestStarter(this, requestThrottle,
"Request starter ("+portNumber+")");
fetchScheduler = new ClientRequestScheduler(false, random,
requestStarter);
+ requestStarter.setScheduler(fetchScheduler);
requestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
insertThrottle = new RequestThrottle(10000, 2.0F);
insertStarter = new RequestStarter(this, insertThrottle,
"Insert starter ("+portNumber+")");
putScheduler = new ClientRequestScheduler(true, random,
insertStarter);
+ insertStarter.setScheduler(putScheduler);
insertStarter.start();
if(testnetHandler != null)
testnetHandler.start();
Modified: branches/async-client/src/freenet/node/RequestStarter.java
===================================================================
--- branches/async-client/src/freenet/node/RequestStarter.java 2006-01-25
14:18:06 UTC (rev 7918)
+++ branches/async-client/src/freenet/node/RequestStarter.java 2006-01-25
14:55:11 UTC (rev 7919)
@@ -41,6 +41,7 @@
final RequestThrottle throttle;
RequestScheduler sched;
final Node node;
+ private long sentRequestTime;
public RequestStarter(Node node, RequestThrottle throttle, String name)
{
this.node = node;
@@ -64,43 +65,50 @@
return name;
}
- public void run() {
- long sentRequestTime = System.currentTimeMillis();
- while(true) {
- SendableRequest req = sched.removeFirst();
- if(req != null) {
- // Create a thread to handle starting the
request, and the resulting feedback
- Thread t = new Thread(new SenderThread(req));
- t.setDaemon(true);
- t.start();
- // Wait
- long delay = throttle.getDelay();
- long sleepUntil = sentRequestTime + delay;
- long now;
- do {
- now = System.currentTimeMillis();
- if(now < sleepUntil)
- try {
- Thread.sleep(sleepUntil
- now);
- } catch (InterruptedException
e) {
- // Ignore
- }
- } while(now < sleepUntil);
- } else {
- synchronized(this) {
- // Always take the lock on
RequestStarter first.
- req = sched.removeFirst();
- if(req != null) continue;
+ void realRun() {
+ SendableRequest req = sched.removeFirst();
+ if(req != null) {
+ // Create a thread to handle starting the request, and
the resulting feedback
+ Thread t = new Thread(new SenderThread(req));
+ t.setDaemon(true);
+ t.start();
+ // Wait
+ long delay = throttle.getDelay();
+ long sleepUntil = sentRequestTime + delay;
+ long now;
+ do {
+ now = System.currentTimeMillis();
+ if(now < sleepUntil)
try {
- wait(1000);
+ Thread.sleep(sleepUntil - now);
} catch (InterruptedException e) {
// Ignore
}
+ } while(now < sleepUntil);
+ } else {
+ synchronized(this) {
+ // Always take the lock on RequestStarter first.
+ req = sched.removeFirst();
+ if(req != null) return;
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // Ignore
}
}
}
}
+ public void run() {
+ while(true) {
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ }
+ }
+ }
+
private class SenderThread implements Runnable {
private final SendableRequest req;
Modified: branches/async-client/src/freenet/support/RandomGrabArray.java
===================================================================
--- branches/async-client/src/freenet/support/RandomGrabArray.java
2006-01-25 14:18:06 UTC (rev 7918)
+++ branches/async-client/src/freenet/support/RandomGrabArray.java
2006-01-25 14:55:11 UTC (rev 7919)
@@ -21,10 +21,20 @@
*/
private HashSet contents;
private final static int MIN_SIZE = 32;
+
+ public RandomGrabArray(RandomSource rand) {
+ this.reqs = new RandomGrabArrayItem[MIN_SIZE];
+ index = 0;
+ this.rand = rand;
+ contents = new HashSet();
+ }
public synchronized void add(RandomGrabArrayItem req) {
if(contents.contains(req)) return;
- if(req.isFinished()) return;
+ if(req.isFinished()) {
+ Logger.minor(this, "Is finished already: "+req);
+ return;
+ }
contents.add(req);
if(index >= reqs.length) {
RandomGrabArray[] r = new
RandomGrabArray[reqs.length*2];
Modified: branches/async-client/src/freenet/support/RandomGrabArrayWithInt.java
===================================================================
--- branches/async-client/src/freenet/support/RandomGrabArrayWithInt.java
2006-01-25 14:18:06 UTC (rev 7918)
+++ branches/async-client/src/freenet/support/RandomGrabArrayWithInt.java
2006-01-25 14:55:11 UTC (rev 7919)
@@ -1,10 +1,13 @@
package freenet.support;
+import freenet.crypt.RandomSource;
+
public class RandomGrabArrayWithInt extends RandomGrabArray implements
IntNumberedItem {
private final int number;
- public RandomGrabArrayWithInt(int no) {
+ public RandomGrabArrayWithInt(RandomSource rand, int no) {
+ super(rand);
number = no;
}
Modified:
branches/async-client/src/freenet/support/SimpleIntNumberedItemComparator.java
===================================================================
---
branches/async-client/src/freenet/support/SimpleIntNumberedItemComparator.java
2006-01-25 14:18:06 UTC (rev 7918)
+++
branches/async-client/src/freenet/support/SimpleIntNumberedItemComparator.java
2006-01-25 14:55:11 UTC (rev 7919)
@@ -4,6 +4,12 @@
public class SimpleIntNumberedItemComparator implements Comparator {
+ private final boolean nullAtStart;
+
+ public SimpleIntNumberedItemComparator(boolean nullAtStart) {
+ this.nullAtStart = nullAtStart;
+ }
+
public int compare(Object o1, Object o2) {
int x = ocompare(o1, o2);
Logger.minor(this, "compare("+o1+","+o2+") = "+x);
@@ -15,9 +21,9 @@
if(o1 == null && o2 == null)
return 0; // null == null
if(o1 != null && o2 == null)
- return 1; // anything > null
+ return nullAtStart? -1 : 1; // anything > null
if(o2 != null && o1 == null)
- return -1;
+ return nullAtStart ? 1 : -1;
long i1, i2;
if(o1 instanceof IntNumberedItem)
i1 = ((IntNumberedItem)o1).getNumber();
Modified: branches/async-client/src/freenet/support/SortedVectorByNumber.java
===================================================================
--- branches/async-client/src/freenet/support/SortedVectorByNumber.java
2006-01-25 14:18:06 UTC (rev 7918)
+++ branches/async-client/src/freenet/support/SortedVectorByNumber.java
2006-01-25 14:55:11 UTC (rev 7919)
@@ -11,8 +11,14 @@
private IntNumberedItem[] data;
private int length;
- private static final Comparator comparator = new
SimpleIntNumberedItemComparator();
+ private static final Comparator comparator = new
SimpleIntNumberedItemComparator(true);
+ private static final int MIN_SIZE = 32;
+ public SortedVectorByNumber() {
+ this.data = new IntNumberedItem[MIN_SIZE];
+ length = 0;
+ }
+
public synchronized IntNumberedItem getFirst() {
if(length == 0) return null;
return data[0];
@@ -36,8 +42,8 @@
System.arraycopy(data, x+1, data, x,
length-x-1);
data[length--] = null;
}
- if(length < 4*data.length) {
- IntNumberedItem[] newData = new
IntNumberedItem[length*2];
+ if(length < 4*data.length && length > MIN_SIZE) {
+ IntNumberedItem[] newData = new
IntNumberedItem[Math.max(length*2, MIN_SIZE)];
System.arraycopy(data, 0, newData, 0, length);
data = newData;
}
@@ -48,14 +54,17 @@
if(x >= 0) throw new IllegalArgumentException(); // already
exists
// insertion point
x = -x-1;
+ Logger.minor(this, "Insertion point: "+x);
// Move the data
if(length == data.length) {
+ Logger.minor(this, "Expanding from "+length+" to
"+length*2);
IntNumberedItem[] newData = new
IntNumberedItem[length*2];
System.arraycopy(data, 0, newData, 0, data.length);
}
if(x < length)
System.arraycopy(data, x, data, x+1, length-x);
data[x] = grabber;
+ length++;
}
}