Author: nextgens
Date: 2007-03-28 12:29:21 +0000 (Wed, 28 Mar 2007)
New Revision: 12410
Added:
trunk/freenet/src/freenet/node/StatusChangeCallback.java
Modified:
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
Log:
*!EXPERIMENTAL CODE!*
Try to spare one thread by transfered request (RequestHandler).
If it works it's cool, if it doesn't it might break the network...
Maybe the timeframe isn't good for such a commit; then revert it :)
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2007-03-28 01:49:26 UTC
(rev 12409)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2007-03-28 12:29:21 UTC
(rev 12410)
@@ -168,10 +168,8 @@
return true;
}
//if(!node.lockUID(id)) return false;
- RequestHandler rh = new RequestHandler(m, id, node);
- Thread t = new Thread(rh, "RequestHandler for UID "+id);
- t.setDaemon(true);
- t.start();
+ RequestHandler rh = new RequestHandler(m, id, node); // Do we
need to keep a record of in flight RHs?
+ rh.run();
return true;
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2007-03-28 01:49:26 UTC
(rev 12409)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2007-03-28 12:29:21 UTC
(rev 12410)
@@ -21,8 +21,13 @@
* is separated off into RequestSender so we get transfer coalescing
* and both ends for free.
*/
-public class RequestHandler implements Runnable, ByteCounter {
+public class RequestHandler implements Runnable, ByteCounter,
StatusChangeCallback {
+ private final static short INITIALIZE = 1;
+ private final static short WAIT_FOR_FIRST_REPLY = 2;
+ private final static short FINISHED = 3;
+ private short currentState = INITIALIZE;
+
private static boolean logMINOR;
final Message req;
final Node node;
@@ -35,9 +40,10 @@
private boolean finalTransferFailed = false;
final boolean resetClosestLoc;
- public String toString() {
- return super.toString()+" for "+uid;
- }
+ private short waitStatus = 0;
+ private int status = RequestSender.NOT_FINISHED;
+ private boolean shouldHaveStartedTransfer = false;
+ private RequestSender rs = null;
public RequestHandler(Message m, long id, Node n) {
req = m;
@@ -60,56 +66,73 @@
}
public void run() {
- int status = RequestSender.NOT_FINISHED;
- RequestSender rs = null;
try {
- if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
- htl = source.decrementHTL(htl);
+ if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
+ htl = source.decrementHTL(htl);
+
+ Message accepted = DMT.createFNPAccepted(uid);
+ source.sendAsync(accepted, null, 0, null);
+
+ Object o = node.makeRequestSender(key, htl, uid, source,
closestLoc, resetClosestLoc, false, true, false);
+ if(o instanceof KeyBlock) {
+ KeyBlock block = (KeyBlock) o;
+ Message df = createDataFound(block);
+ source.sendAsync(df, null, 0, null);
+ if(key instanceof NodeSSK) {
+ if(needsPubKey) {
+ DSAPublicKey key =
((NodeSSK)block.getKey()).getPubKey();
+ Message pk =
DMT.createFNPSSKPubKey(uid, key.asBytes());
+ if(logMINOR) Logger.minor(this,
"Sending PK: "+key+ ' ' +key.toLongString());
+ source.sendAsync(pk, null, 0, null);
+ }
+ status = RequestSender.SUCCESS; // for byte
logging
+ }
+ if(block instanceof CHKBlock) {
+ PartiallyReceivedBlock prb =
+ new
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE,
block.getRawData());
+ BlockTransmitter bt =
+ new BlockTransmitter(node.usm, source,
uid, prb, node.outputThrottle, this);
+ node.addTransferringRequestHandler(uid);
+ if(bt.send())
+ status = RequestSender.SUCCESS; // for
byte logging
+ }
+ return;
+ }
+ rs = (RequestSender) o;
+
+ if(rs == null) { // ran out of htl?
+ Message dnf = DMT.createFNPDataNotFound(uid);
+ source.sendAsync(dnf, null, 0, null);
+ status = RequestSender.DATA_NOT_FOUND; // for byte
logging
+ return;
+ }
+
+ synchronized (this) {
+ currentState = WAIT_FOR_FIRST_REPLY;
+ }
+ rs.callbackWhenStatusChange(this, waitStatus);
+
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ _finally(); // Yes, we don't want the finally() block here
anyway
+ }
+ }
+
+ public void onStatusChange(short mask) {
+ waitStatus = mask;
+ if(currentState == WAIT_FOR_FIRST_REPLY) {
+ waitForFirstReply();
+ synchronized (this) {
+ currentState = FINISHED;
+ }
+ }
+ }
- Message accepted = DMT.createFNPAccepted(uid);
- source.send(accepted, null);
-
- Object o = node.makeRequestSender(key, htl, uid, source, closestLoc,
resetClosestLoc, false, true, false);
- if(o instanceof KeyBlock) {
- KeyBlock block = (KeyBlock) o;
- Message df = createDataFound(block);
- source.send(df, null);
- if(key instanceof NodeSSK) {
- if(needsPubKey) {
- DSAPublicKey key =
((NodeSSK)block.getKey()).getPubKey();
- Message pk = DMT.createFNPSSKPubKey(uid, key.asBytes());
- if(logMINOR) Logger.minor(this, "Sending PK: "+key+ ' '
+key.toLongString());
- source.send(pk, null);
- }
- status = RequestSender.SUCCESS; // for byte logging
- }
- if(block instanceof CHKBlock) {
- PartiallyReceivedBlock prb =
- new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
- BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
- node.addTransferringRequestHandler(uid);
- if(bt.send())
- status = RequestSender.SUCCESS; // for byte logging
- }
- return;
- }
- rs = (RequestSender) o;
-
- if(rs == null) { // ran out of htl?
- Message dnf = DMT.createFNPDataNotFound(uid);
- source.send(dnf, null);
- status = RequestSender.DATA_NOT_FOUND; // for byte logging
- return;
- }
-
- boolean shouldHaveStartedTransfer = false;
-
- short waitStatus = 0;
-
- while(true) {
-
- waitStatus = rs.waitUntilStatusChange(waitStatus);
+ private void waitForFirstReply(){
+ synchronized (this) {
+ if(currentState != WAIT_FOR_FIRST_REPLY) return;
+ }
+ try {
if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0) {
// Forward RejectedOverload
Message msg = DMT.createFNPRejectedOverload(uid, false);
@@ -119,7 +142,7 @@
if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
// Is a CHK.
Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
- source.send(df, null);
+ source.sendAsync(df, null, 0, null);
PartiallyReceivedBlock prb = rs.getPRB();
BlockTransmitter bt =
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
@@ -132,13 +155,13 @@
status = rs.getStatus();
- if(status == RequestSender.NOT_FINISHED) continue;
+ if(status == RequestSender.NOT_FINISHED)
rs.callbackWhenStatusChange(this, waitStatus);
switch(status) {
case RequestSender.NOT_FINISHED:
case RequestSender.DATA_NOT_FOUND:
Message dnf = DMT.createFNPDataNotFound(uid);
- source.send(dnf, this);
+ source.sendAsync(dnf, null, 0, null);
return;
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -146,21 +169,21 @@
// Locally generated.
// Propagate back to source who needs to reduce send rate
Message reject = DMT.createFNPRejectedOverload(uid, true);
- source.send(reject, this);
+ source.sendAsync(reject, null, 0, null);
return;
case RequestSender.ROUTE_NOT_FOUND:
// Tell source
Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- source.send(rnf, this);
+ source.sendAsync(rnf, null, 0, null);
return;
case RequestSender.SUCCESS:
if(key instanceof NodeSSK) {
Message df = DMT.createFNPSSKDataFound(uid,
rs.getHeaders(), rs.getSSKData());
- source.send(df, this);
+ source.sendAsync(df, null, 0, null);
node.sentPayload(rs.getSSKData().length);
if(needsPubKey) {
Message pk = DMT.createFNPSSKPubKey(uid,
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
- source.send(pk, this);
+ source.sendAsync(pk, null, 0, null);
}
} else if(!rs.transferStarted()) {
Logger.error(this, "Status is SUCCESS but we
never started a transfer on "+uid);
@@ -171,55 +194,58 @@
if(shouldHaveStartedTransfer)
throw new IllegalStateException("Got
status code "+status+" but transfer not started");
shouldHaveStartedTransfer = true;
- continue; // should have started transfer
+ rs.callbackWhenStatusChange(this, waitStatus);
}
reject = DMT.createFNPRejectedOverload(uid, true);
- source.send(reject, this);
+ source.sendAsync(reject, null, 0, null);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
if(shouldHaveStartedTransfer)
throw new IllegalStateException("Got
status code "+status+" but transfer not started");
shouldHaveStartedTransfer = true;
- continue; // should have started transfer
+ rs.callbackWhenStatusChange(this, waitStatus);
}
// Other side knows, right?
return;
default:
throw new IllegalStateException("Unknown status code
"+status);
}
- }
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
} finally {
- node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false);
- if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
- && status != RequestSender.INTERNAL_ERROR) {
- int sent = rs.getTotalSentBytes() + sentBytes;
- int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
- if(key instanceof NodeSSK) {
- if(logMINOR) Logger.minor(this, "Remote SSK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-
node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
-
node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
- if(status == RequestSender.SUCCESS) {
-
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
-
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
- }
- } else {
- if(logMINOR) Logger.minor(this, "Remote CHK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-
node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
-
node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
- if(status == RequestSender.SUCCESS) {
-
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
-
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
- }
- }
- }
-
+ _finally();
}
}
+ private synchronized void _finally() {
+ currentState = FINISHED;
+ node.removeTransferringRequestHandler(uid);
+ node.unlockUID(uid, key instanceof NodeSSK, false);
+ if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
+ && status != RequestSender.INTERNAL_ERROR) {
+ int sent = rs.getTotalSentBytes() + sentBytes;
+ int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
+ if(key instanceof NodeSSK) {
+ if(logMINOR) Logger.minor(this, "Remote SSK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+ node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
+ node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
+ if(status == RequestSender.SUCCESS) {
+
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
+
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
+ }
+ } else {
+ if(logMINOR) Logger.minor(this, "Remote CHK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+ node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
+ node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
+ if(status == RequestSender.SUCCESS) {
+
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
+
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
+ }
+ }
+ }
+ }
+
private Message createDataFound(KeyBlock block) {
if(block instanceof CHKBlock)
return DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
@@ -249,5 +275,8 @@
public void sentPayload(int x) {
node.sentPayload(x);
}
-
+
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
}
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2007-03-28 01:49:26 UTC
(rev 12409)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2007-03-28 12:29:21 UTC
(rev 12410)
@@ -519,7 +519,53 @@
}
}
+
/**
+ * Same as waitUntilStatusChange but non blocking
+ * @see waitUntilStatusChange(short mask)
+ */
+ public void callbackWhenStatusChange(final StatusChangeCallback cb, final
short mask) {
+
+ if(mask == WAIT_ALL) throw new IllegalArgumentException("Cannot ignore
all!");
+
+ Runnable whenStatusChange = new Runnable(){
+ boolean isRunning = false;
+
+ public void run(){
+ synchronized (this) {
+ if(isRunning) return;
+ isRunning = true;
+ }
+ _realRun();
+ isRunning = false;
+ }
+
+ private void _realRun() {
+ short current = mask; // If any bits are set already,
we ignore those states.
+
+ synchronized (cb) {
+ if(hasForwardedRejectedOverload)
+ current |= WAIT_REJECTED_OVERLOAD;
+
+ if(prb != null)
+ current |= WAIT_TRANSFERRING_DATA;
+
+ if(status != NOT_FINISHED)
+ current |= WAIT_FINISHED;
+
+ }
+
+ if(current != mask)
+ cb.onStatusChange(current);
+ else
+ node.ps.queueTimedJob(this, 10000);
+ }
+ };
+
+ whenStatusChange.run();
+ }
+
+ /**
* Wait until we have a terminal status code.
*/
public synchronized void waitUntilFinished() {
Added: trunk/freenet/src/freenet/node/StatusChangeCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/StatusChangeCallback.java
(rev 0)
+++ trunk/freenet/src/freenet/node/StatusChangeCallback.java 2007-03-28
12:29:21 UTC (rev 12410)
@@ -0,0 +1,8 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.node;
+
+public interface StatusChangeCallback {
+ public abstract void onStatusChange(short mask);
+}
\ No newline at end of file