Author: robert
Date: 2008-01-21 19:17:14 +0000 (Mon, 21 Jan 2008)
New Revision: 17190
Modified:
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
Log:
use callback for requestSender status; now only 1 thread per-request (rather
than 2)
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-01-21
19:10:35 UTC (rev 17189)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-01-21
19:17:14 UTC (rev 17190)
@@ -63,6 +63,8 @@
final DoubleTokenBucket _masterThrottle;
final ByteCounter _ctr;
final int PACKET_SIZE;
+ private boolean asyncExitStatus;
+ private boolean asyncExitStatusSet;
public BlockTransmitter(MessageCore usm, PeerContext destination, long
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle,
ByteCounter ctr) {
_usm = usm;
@@ -289,7 +291,16 @@
*/
public void sendAsync(final Executor executor) {
executor.execute(new Runnable() {
- public void run() { send(executor); } },
+ public void run() {
+ try {
+
asyncExitStatus=send(executor);
+ } finally {
+ synchronized
(BlockTransmitter.this) {
+ asyncExitStatusSet=true;
+
BlockTransmitter.this.notifyAll();
+ }
+ }
+ } },
"BlockTransmitter:sendAsync() for "+this);
}
@@ -304,6 +315,19 @@
}
}
}
+
+ public boolean getAsyncExitStatus() {
+ synchronized (this) {
+ while (!asyncExitStatusSet) {
+ try {
+ this.wait(10*1000);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+ return asyncExitStatus;
+ }
public PeerContext getDestination() {
return _destination;
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-01-21 19:10:35 UTC
(rev 17189)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-01-21 19:17:14 UTC
(rev 17190)
@@ -28,7 +28,7 @@
* is separated off into RequestSender so we get transfer coalescing
* and both ends for free.
*/
-public class RequestHandler implements Runnable, ByteCounter {
+public class RequestHandler implements Runnable, ByteCounter,
RequestSender.Listener {
private static boolean logMINOR;
final Message req;
@@ -45,6 +45,10 @@
private RequestSender rs;
private int status = RequestSender.NOT_FINISHED;
private boolean appliedByteCounts=false;
+ private boolean sentRejectedOverload = false;
+ private long searchStartTime;
+ private long responseDeadline;
+ private BlockTransmitter bt;
public String toString() {
return super.toString()+" for "+uid;
@@ -76,12 +80,14 @@
freenet.support.Logger.OSThread.logPID(this);
try {
realRun();
+ //The last thing that realRun() does is register as a
request-sender listener, so any exception here is the end.
} catch (NotConnectedException e) {
- // Ignore, normal
+ Logger.normal(this, "requestor gone, could not start request
handler wait");
+ node.removeTransferringRequestHandler(uid);
+ node.unlockUID(uid, key instanceof NodeSSK, false, false);
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
- } finally {
- node.removeTransferringRequestHandler(uid);
+ node.removeTransferringRequestHandler(uid);
node.unlockUID(uid, key instanceof NodeSSK, false, false);
}
}
@@ -150,26 +156,16 @@
return;
}
- boolean shouldHaveStartedTransfer = false;
- boolean sentRejectedOverload = false;
+ //If we cannot respond before this time, the 'source' node has already
fatally timed out (and we need not return packets which will not be claimed)
+ searchStartTime = System.currentTimeMillis();
+ responseDeadline = searchStartTime +
RequestSender.FETCH_TIMEOUT + source.getProbableSendQueueTime();
+
+ rs.addListener(this);
+ }
- //If we cannot respond before this time, the 'source' node has
already fatally timed out (and we need not return packets which will not be
claimed)
- long searchStartTime = System.currentTimeMillis();
- long responseDeadline = searchStartTime +
RequestSender.FETCH_TIMEOUT + source.getProbableSendQueueTime();
- short waitStatus = 0;
-
- while(true) {
-
- waitStatus = rs.waitUntilStatusChange(waitStatus);
- long now = System.currentTimeMillis();
-
- if (now > responseDeadline) {
- Logger.error(this, "requestsender took too long
to respond to requestor ("+TimeUtil.formatTime((now - searchStartTime), 2,
true)+"/"+rs.getStatus()+")");
- applyByteCounts();
- return;
- }
-
- if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0 &&
!sentRejectedOverload) {
+ public void onReceivedRejectOverload() {
+ try {
+ if(!sentRejectedOverload) {
// Forward RejectedOverload
//Note: This message is only decernable from
the terminal messages by the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
Message msg = DMT.createFNPRejectedOverload(uid, false);
@@ -177,17 +173,30 @@
//If the status changes (e.g. to SUCCESS),
there is little need to send yet another reject overload.
sentRejectedOverload=true;
}
-
- if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "requestor is gone, can't forward
reject overload");
+ }
+ }
+
+ public void onCHKTransferBegins() {
+ try {
// Is a CHK.
Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
source.sendAsync(df, null, 0, this);
PartiallyReceivedBlock prb = rs.getPRB();
- BlockTransmitter bt =
+ bt =
new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
node.addTransferringRequestHandler(uid);
- if(bt.send(node.executor)) {
+ bt.sendAsync(node.executor);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "requestor is gone, can't begin CHK
transfer");
+ }
+ }
+
+ private void waitAndFinishCHKTransfer() throws NotConnectedException {
+ if (logMINOR) Logger.minor(this, "Waiting for CHK transfer to
finish");
+ if(bt.getAsyncExitStatus()) {
status = rs.getStatus();
// Successful CHK transfer, maybe path fold
finishOpennetChecked();
@@ -196,14 +205,24 @@
status = rs.getStatus();
//for byte logging, since the block is
the 'terminal' message.
applyByteCounts();
+ unregisterRequestHandlerWithNode();
}
- return;
- }
+ }
+
+ public void onRequestSenderFinished(int status) {
+ long now = System.currentTimeMillis();
+
+ if (now > responseDeadline) {
+ Logger.error(this, "requestsender took too long to
respond to requestor ("+TimeUtil.formatTime((now - searchStartTime), 2,
true)+"/"+rs.getStatus()+")");
+ applyByteCounts();
+ unregisterRequestHandlerWithNode();
+ return;
+ }
+
+ if(status == RequestSender.NOT_FINISHED)
+ Logger.error(this, "onFinished() but not finished?");
- status = rs.getStatus();
-
- if(status == RequestSender.NOT_FINISHED) continue;
-
+ try {
switch(status) {
case RequestSender.NOT_FINISHED:
case RequestSender.DATA_NOT_FOUND:
@@ -239,45 +258,56 @@
} else {
sendTerminal(df);
}
- return;
} else {
- if(!rs.transferStarted()) {
+ if(bt == null) {
// Bug! This is impossible!
Logger.error(this, "Status is SUCCESS
but we never started a transfer on "+uid);
- // Could be a wierd synchronization
bug, but we don't want to wait forever, so treat it as overload.
+ // Obviously this node is confused,
send a terminal reject to make sure the requestor is not waiting forever.
reject = DMT.createFNPRejectedOverload(uid, true);
sendTerminal(reject);
- return;
} else {
- // Race condition. We need to go around
the loop again and pick up the data transfer
- // in waitStatus.
+ waitAndFinishCHKTransfer();
}
- // Either way, go back around the loop.
- continue;
}
+ return;
case RequestSender.VERIFY_FAILURE:
if(key instanceof NodeCHK) {
- if(shouldHaveStartedTransfer)
- throw new IllegalStateException("Got
status code "+status+" but transfer not started");
- shouldHaveStartedTransfer = true;
- continue; // should have started transfer
+ if(bt == null) {
+ // Bug! This is impossible!
+ Logger.error(this, "Status is
VERIFY_FAILURE but we never started a transfer on "+uid);
+ // Obviously this node
is confused, send a terminal reject to make sure the requestor is not waiting
forever.
+ reject = DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ } else {
+ //Verify fails after
receive() is complete, so we might as well propagate it...
+ waitAndFinishCHKTransfer();
+ }
+ return;
}
reject = DMT.createFNPRejectedOverload(uid, true);
sendTerminal(reject);
return;
case RequestSender.TRANSFER_FAILED:
if(key instanceof NodeCHK) {
- if(shouldHaveStartedTransfer)
- throw new IllegalStateException("Got
status code "+status+" but transfer not started");
- shouldHaveStartedTransfer = true;
- continue; // should have started transfer
+ if(bt == null) {
+ // Bug! This is impossible!
+ Logger.error(this, "Status is
TRANSFER_FAILED but we never started a transfer on "+uid);
+ // Obviously this node
is confused, send a terminal reject to make sure the requestor is not waiting
forever.
+ reject = DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ } else {
+ waitAndFinishCHKTransfer();
+ }
+ return;
}
- // Other side knows, right?
+ Logger.error(this, "finish(TRANSFER_FAILED) should not
be called on SSK?!?!");
return;
default:
throw new IllegalStateException("Unknown status code
"+status);
}
- }
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "requestor is gone, can't send
terminal message");
+ }
}
/**
@@ -313,10 +343,16 @@
} else {
//also for byte logging, since the block is the 'terminal'
message.
applyByteCounts();
+ unregisterRequestHandlerWithNode();
}
}
}
+ private void unregisterRequestHandlerWithNode() {
+ node.removeTransferringRequestHandler(uid);
+ node.unlockUID(uid, key instanceof NodeSSK, false, false);
+ }
+
/**
* Sends the 'final' packet of a request in such a way that the thread can
be freed (made non-runnable/exit)
* and the byte counter will still be accurate.
@@ -352,6 +388,7 @@
public void sent() {
//For byte counting, this relies on the fact that the callback
will only be excuted once.
applyByteCounts();
+ unregisterRequestHandlerWithNode();
}
}
@@ -366,6 +403,7 @@
(node.passOpennetRefsThroughDarknet() ||
source.isOpennet()) &&
finishOpennetInner(om)) {
applyByteCounts();
+ unregisterRequestHandlerWithNode();
return;
}
@@ -383,6 +421,7 @@
if(om != null && (source.isOpennet() ||
node.passOpennetRefsThroughDarknet()) &&
finishOpennetNoRelayInner(om)) {
applyByteCounts();
+ unregisterRequestHandlerWithNode();
return;
}
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2008-01-21 19:10:35 UTC
(rev 17189)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2008-01-21 19:17:14 UTC
(rev 17190)
@@ -3,7 +3,9 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import freenet.crypt.CryptFormatException;
import freenet.crypt.DSAPublicKey;
@@ -70,6 +72,8 @@
private byte[] sskData;
private SSKBlock block;
private boolean hasForwarded;
+
+ private ArrayList listeners=new ArrayList();
// Terminal status
// Always set finished AFTER setting the reason flag
@@ -433,7 +437,8 @@
synchronized(this) {
notifyAll();
}
-
+ fireCHKTransferBegins();
+
BlockReceiver br = new BlockReceiver(node.usm,
next, uid, prb, this);
try {
@@ -580,10 +585,13 @@
private volatile boolean hasForwardedRejectedOverload;
/** Forward RejectedOverload to the request originator */
- private synchronized void forwardRejectedOverload() {
- if(hasForwardedRejectedOverload) return;
- hasForwardedRejectedOverload = true;
- notifyAll();
+ private void forwardRejectedOverload() {
+ synchronized (this) {
+ if(hasForwardedRejectedOverload) return;
+ hasForwardedRejectedOverload = true;
+ notifyAll();
+ }
+ fireReceivedRejectOverload();
}
public PartiallyReceivedBlock getPRB() {
@@ -659,20 +667,27 @@
if(status == SUCCESS)
successFrom = next;
}
-
+
if(status == SUCCESS) {
if(next != null) {
next.onSuccess(false, key instanceof NodeSSK);
}
node.nodeStats.requestCompleted(true, source != null, key
instanceof NodeSSK);
+ //NOTE: because of the requesthandler implementation,
this will block and wait
+ // for downstream transfers on a CHK. The opennet
stuff introduces
+ // a delay of it's own if we don't get the
expected message.
+ fireRequestSenderFinished(code);
+
if(key instanceof NodeCHK && next != null &&
(next.isOpennet() ||
node.passOpennetRefsThroughDarknet()) ) {
finishOpennet(next);
} else
finishOpennetNull(next);
- } else
+ } else {
node.nodeStats.requestCompleted(false, source != null, key
instanceof NodeSSK);
+ fireRequestSenderFinished(code);
+ }
synchronized(this) {
opennetFinished = true;
@@ -852,4 +867,72 @@
public boolean isLocalRequestSearch() {
return (source==null);
}
+
+ interface Listener {
+ void onReceivedRejectOverload();
+ void onCHKTransferBegins();
+ void onRequestSenderFinished(int status);
+ }
+
+ public void addListener(Listener l) {
+ boolean reject=false;
+ boolean transfer=false;
+ int status;
+ synchronized (this) {
+ synchronized (listeners) {
+ listeners.add(l);
+ }
+ reject=hasForwardedRejectedOverload;
+ transfer=transferStarted();
+ status=this.status;
+ }
+ if (reject)
+ l.onReceivedRejectOverload();
+ if (transfer)
+ l.onCHKTransferBegins();
+ if (status!=NOT_FINISHED)
+ l.onRequestSenderFinished(status);
+ }
+
+ private void fireReceivedRejectOverload() {
+ synchronized (listeners) {
+ Iterator i=listeners.iterator();
+ while (i.hasNext()) {
+ Listener l=(Listener)i.next();
+ try {
+ l.onReceivedRejectOverload();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught: "+t, t);
+ }
+ }
+ }
+ }
+
+ private void fireCHKTransferBegins() {
+ synchronized (listeners) {
+ Iterator i=listeners.iterator();
+ while (i.hasNext()) {
+ Listener l=(Listener)i.next();
+ try {
+ l.onCHKTransferBegins();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught: "+t, t);
+ }
+ }
+ }
+ }
+
+ private void fireRequestSenderFinished(int status) {
+ synchronized (listeners) {
+ Iterator i=listeners.iterator();
+ while (i.hasNext()) {
+ Listener l=(Listener)i.next();
+ try {
+ l.onRequestSenderFinished(status);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught: "+t, t);
+ }
+ }
+ }
+ }
}