Author: nextgens
Date: 2007-03-28 22:12:30 +0000 (Wed, 28 Mar 2007)
New Revision: 12417
Removed:
trunk/freenet/src/freenet/node/StatusChangeCallback.java
Modified:
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
Log:
Revert since r12409; my code isn't ready for a release ... And it seems it's
time for #1021 ... (BlockTransmitter has to be refactored; we can't keep the
callback on RequestHandler for too long: it hangs the PacketSender loop)
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2007-03-28 21:56:36 UTC
(rev 12416)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2007-03-28 22:12:30 UTC
(rev 12417)
@@ -168,8 +168,10 @@
return true;
}
//if(!node.lockUID(id)) return false;
- RequestHandler rh = new RequestHandler(m, id, node); // Do we
need to keep a record of in flight RHs?
- rh.run();
+ RequestHandler rh = new RequestHandler(m, id, node);
+ Thread t = new Thread(rh, "RequestHandler for UID "+id);
+ t.setDaemon(true);
+ t.start();
return true;
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2007-03-28 21:56:36 UTC
(rev 12416)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2007-03-28 22:12:30 UTC
(rev 12417)
@@ -21,13 +21,8 @@
* is separated off into RequestSender so we get transfer coalescing
* and both ends for free.
*/
-public class RequestHandler implements ByteCounter, StatusChangeCallback {
+public class RequestHandler implements Runnable, ByteCounter {
- private final static short INITIALIZE = 1;
- private final static short WAIT_FOR_FIRST_REPLY = 2;
- private final static short FINISHED = 3;
- private short currentState = INITIALIZE;
-
private static boolean logMINOR;
final Message req;
final Node node;
@@ -40,10 +35,9 @@
private boolean finalTransferFailed = false;
final boolean resetClosestLoc;
- private short waitStatus = 0;
- private int status = RequestSender.NOT_FINISHED;
- private boolean shouldHaveStartedTransfer = false;
- private RequestSender rs = null;
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
public RequestHandler(Message m, long id, Node n) {
req = m;
@@ -66,73 +60,56 @@
}
public void run() {
+ int status = RequestSender.NOT_FINISHED;
+ RequestSender rs = null;
try {
- if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
- htl = source.decrementHTL(htl);
-
- Message accepted = DMT.createFNPAccepted(uid);
- source.sendAsync(accepted, null, 0, null);
-
- Object o = node.makeRequestSender(key, htl, uid, source,
closestLoc, resetClosestLoc, false, true, false);
- if(o instanceof KeyBlock) {
- KeyBlock block = (KeyBlock) o;
- Message df = createDataFound(block);
- source.sendAsync(df, null, 0, null);
- if(key instanceof NodeSSK) {
- if(needsPubKey) {
- DSAPublicKey key =
((NodeSSK)block.getKey()).getPubKey();
- Message pk =
DMT.createFNPSSKPubKey(uid, key.asBytes());
- if(logMINOR) Logger.minor(this,
"Sending PK: "+key+ ' ' +key.toLongString());
- source.sendAsync(pk, null, 0, null);
- }
- status = RequestSender.SUCCESS; // for byte
logging
- }
- if(block instanceof CHKBlock) {
- PartiallyReceivedBlock prb =
- new
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE,
block.getRawData());
- BlockTransmitter bt =
- new BlockTransmitter(node.usm, source,
uid, prb, node.outputThrottle, this);
- node.addTransferringRequestHandler(uid);
- if(bt.send())
- status = RequestSender.SUCCESS; // for
byte logging
- }
- return;
- }
- rs = (RequestSender) o;
-
- if(rs == null) { // ran out of htl?
- Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendAsync(dnf, null, 0, null);
- status = RequestSender.DATA_NOT_FOUND; // for byte
logging
- return;
- }
-
- synchronized (this) {
- currentState = WAIT_FOR_FIRST_REPLY;
- }
- rs.callbackWhenStatusChange(this, waitStatus);
-
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- _finally(); // Yes, we don't want the finally() block here
anyway
- }
- }
-
- public void onStatusChange(short mask) {
- waitStatus = mask;
- if(currentState == WAIT_FOR_FIRST_REPLY) {
- waitForFirstReply();
- synchronized (this) {
- currentState = FINISHED;
- }
- }
- }
+ if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
+ htl = source.decrementHTL(htl);
- private void waitForFirstReply(){
- synchronized (this) {
- if(currentState != WAIT_FOR_FIRST_REPLY) return;
- }
- try {
+ Message accepted = DMT.createFNPAccepted(uid);
+ source.send(accepted, null);
+
+ Object o = node.makeRequestSender(key, htl, uid, source, closestLoc,
resetClosestLoc, false, true, false);
+ if(o instanceof KeyBlock) {
+ KeyBlock block = (KeyBlock) o;
+ Message df = createDataFound(block);
+ source.send(df, null);
+ if(key instanceof NodeSSK) {
+ if(needsPubKey) {
+ DSAPublicKey key =
((NodeSSK)block.getKey()).getPubKey();
+ Message pk = DMT.createFNPSSKPubKey(uid, key.asBytes());
+ if(logMINOR) Logger.minor(this, "Sending PK: "+key+ ' '
+key.toLongString());
+ source.send(pk, null);
+ }
+ status = RequestSender.SUCCESS; // for byte logging
+ }
+ if(block instanceof CHKBlock) {
+ PartiallyReceivedBlock prb =
+ new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
+ BlockTransmitter bt =
+ new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
+ node.addTransferringRequestHandler(uid);
+ if(bt.send())
+ status = RequestSender.SUCCESS; // for byte logging
+ }
+ return;
+ }
+ rs = (RequestSender) o;
+
+ if(rs == null) { // ran out of htl?
+ Message dnf = DMT.createFNPDataNotFound(uid);
+ source.send(dnf, null);
+ status = RequestSender.DATA_NOT_FOUND; // for byte logging
+ return;
+ }
+
+ boolean shouldHaveStartedTransfer = false;
+
+ short waitStatus = 0;
+
+ while(true) {
+
+ waitStatus = rs.waitUntilStatusChange(waitStatus);
if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0) {
// Forward RejectedOverload
Message msg = DMT.createFNPRejectedOverload(uid, false);
@@ -142,7 +119,7 @@
if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
// Is a CHK.
Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
- source.sendAsync(df, null, 0, null);
+ source.send(df, null);
PartiallyReceivedBlock prb = rs.getPRB();
BlockTransmitter bt =
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
@@ -155,13 +132,13 @@
status = rs.getStatus();
- if(status == RequestSender.NOT_FINISHED)
rs.callbackWhenStatusChange(this, waitStatus);
+ if(status == RequestSender.NOT_FINISHED) continue;
switch(status) {
case RequestSender.NOT_FINISHED:
case RequestSender.DATA_NOT_FOUND:
Message dnf = DMT.createFNPDataNotFound(uid);
- source.sendAsync(dnf, null, 0, null);
+ source.send(dnf, this);
return;
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -169,21 +146,21 @@
// Locally generated.
// Propagate back to source who needs to reduce send rate
Message reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null, 0, null);
+ source.send(reject, this);
return;
case RequestSender.ROUTE_NOT_FOUND:
// Tell source
Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- source.sendAsync(rnf, null, 0, null);
+ source.send(rnf, this);
return;
case RequestSender.SUCCESS:
if(key instanceof NodeSSK) {
Message df = DMT.createFNPSSKDataFound(uid,
rs.getHeaders(), rs.getSSKData());
- source.sendAsync(df, null, 0, null);
+ source.send(df, this);
node.sentPayload(rs.getSSKData().length);
if(needsPubKey) {
Message pk = DMT.createFNPSSKPubKey(uid,
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
- source.sendAsync(pk, null, 0, null);
+ source.send(pk, this);
}
} else if(!rs.transferStarted()) {
Logger.error(this, "Status is SUCCESS but we
never started a transfer on "+uid);
@@ -194,60 +171,55 @@
if(shouldHaveStartedTransfer)
throw new IllegalStateException("Got
status code "+status+" but transfer not started");
shouldHaveStartedTransfer = true;
- rs.callbackWhenStatusChange(this, waitStatus);
+ continue; // should have started transfer
}
reject = DMT.createFNPRejectedOverload(uid, true);
- source.sendAsync(reject, null, 0, null);
+ source.send(reject, this);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
if(shouldHaveStartedTransfer)
throw new IllegalStateException("Got
status code "+status+" but transfer not started");
shouldHaveStartedTransfer = true;
- rs.callbackWhenStatusChange(this, waitStatus);
+ continue; // should have started transfer
}
// Other side knows, right?
return;
default:
throw new IllegalStateException("Unknown status code
"+status);
}
+ }
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
} finally {
- _finally();
+ node.removeTransferringRequestHandler(uid);
+ node.unlockUID(uid, key instanceof NodeSSK, false);
+ if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
+ && status != RequestSender.INTERNAL_ERROR) {
+ int sent = rs.getTotalSentBytes() + sentBytes;
+ int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
+ if(key instanceof NodeSSK) {
+ if(logMINOR) Logger.minor(this, "Remote SSK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+
node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
+
node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
+ if(status == RequestSender.SUCCESS) {
+
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
+
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
+ }
+ } else {
+ if(logMINOR) Logger.minor(this, "Remote CHK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
+
node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
+
node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
+ if(status == RequestSender.SUCCESS) {
+
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
+
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
+ }
+ }
+ }
+
}
}
- private synchronized void _finally() {
- if(logMINOR && (currentState != WAIT_FOR_FIRST_REPLY))
- Logger.minor(this, "Hmm, wtf ?");
- currentState = FINISHED;
- node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false);
- if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
- && status != RequestSender.INTERNAL_ERROR) {
- int sent = rs.getTotalSentBytes() + sentBytes;
- int rcvd = rs.getTotalReceivedBytes() + receivedBytes;
- if(key instanceof NodeSSK) {
- if(logMINOR) Logger.minor(this, "Remote SSK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
- node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
- node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
- if(status == RequestSender.SUCCESS) {
-
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
-
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(sent);
- }
- } else {
- if(logMINOR) Logger.minor(this, "Remote CHK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
- node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
- node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
- if(status == RequestSender.SUCCESS) {
-
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
-
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(sent);
- }
- }
- }
- }
-
private Message createDataFound(KeyBlock block) {
if(block instanceof CHKBlock)
return DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
@@ -277,8 +249,5 @@
public void sentPayload(int x) {
node.sentPayload(x);
}
-
- public String toString() {
- return super.toString()+" for "+uid;
- }
+
}
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2007-03-28 21:56:36 UTC
(rev 12416)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2007-03-28 22:12:30 UTC
(rev 12417)
@@ -519,54 +519,7 @@
}
}
-
/**
- * Same as waitUntilStatusChange but non blocking
- * @see waitUntilStatusChange(short mask)
- */
- public void callbackWhenStatusChange(final StatusChangeCallback cb, final
short mask) {
-
- if(mask == WAIT_ALL) throw new IllegalArgumentException("Cannot ignore
all!");
-
- final Runnable whenStatusChange = new Runnable(){
- private boolean isRunning = false;
-
- public void run(){
- synchronized (this) {
- if(isRunning) return;
- isRunning = true;
- }
- _realRun();
- synchronized (this) {
- isRunning = false;
- }
- }
-
- private void _realRun() {
- short current;
- synchronized (cb) {
- current = mask; // If any bits are set already,
we ignore those states.
-
- if(hasForwardedRejectedOverload)
- current |= WAIT_REJECTED_OVERLOAD;
-
- if(prb != null)
- current |= WAIT_TRANSFERRING_DATA;
-
- if(status != NOT_FINISHED)
- current |= WAIT_FINISHED;
-
- if(current == mask)
- node.ps.queueTimedJob(this, 10000);
- }
- cb.onStatusChange(current);
- }
- };
-
- whenStatusChange.run();
- }
-
- /**
* Wait until we have a terminal status code.
*/
public synchronized void waitUntilFinished() {
Deleted: trunk/freenet/src/freenet/node/StatusChangeCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/StatusChangeCallback.java 2007-03-28
21:56:36 UTC (rev 12416)
+++ trunk/freenet/src/freenet/node/StatusChangeCallback.java 2007-03-28
22:12:30 UTC (rev 12417)
@@ -1,8 +0,0 @@
-/* This code is part of Freenet. It is distributed under the GNU General
- * Public License, version 2 (or at your option any later version). See
- * http://www.gnu.org/ for further details of the GPL. */
-package freenet.node;
-
-public interface StatusChangeCallback {
- public abstract void onStatusChange(short mask);
-}
\ No newline at end of file