Author: nextgens
Date: 2008-07-26 22:44:42 +0000 (Sat, 26 Jul 2008)
New Revision: 21430
Modified:
trunk/freenet/src/freenet/node/RequestHandler.java
Log:
indent
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-07-26 22:42:59 UTC
(rev 21429)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-07-26 22:44:42 UTC
(rev 21430)
@@ -33,184 +33,187 @@
public class RequestHandler implements PrioRunnable, ByteCounter,
RequestSender.Listener {
private static boolean logMINOR;
- final Message req;
- final Node node;
- final long uid;
- private short htl;
- final PeerNode source;
- private boolean needsPubKey;
- final Key key;
- private boolean finalTransferFailed = false;
- /** The RequestSender, if any */
- private RequestSender rs;
- private int status = RequestSender.NOT_FINISHED;
- private boolean appliedByteCounts=false;
+ final Message req;
+ final Node node;
+ final long uid;
+ private short htl;
+ final PeerNode source;
+ private boolean needsPubKey;
+ final Key key;
+ private boolean finalTransferFailed = false;
+ /** The RequestSender, if any */
+ private RequestSender rs;
+ private int status = RequestSender.NOT_FINISHED;
+ private boolean appliedByteCounts = false;
private boolean sentRejectedOverload = false;
private long searchStartTime;
private long responseDeadline;
- private BlockTransmitter bt;
-
- public String toString() {
- return super.toString()+" for "+uid;
- }
-
- public RequestHandler(Message m, PeerNode source, long id, Node n, short
htl, Key key) {
- req = m;
- node = n;
- uid = id;
- this.source = source;
- this.htl = htl;
- if(htl <= 0) htl = 1;
- this.key = key;
- if(key instanceof NodeSSK)
- needsPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- receivedBytes(m.receivedByteCount());
- }
+ private BlockTransmitter bt;
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- try {
- realRun();
- //The last thing that realRun() does is register as a
request-sender listener, so any exception here is the end.
- } catch (NotConnectedException e) {
- Logger.normal(this, "requestor gone, could not start request
handler wait");
+ public String toString() {
+ return super.toString() + " for " + uid;
+ }
+
+ public RequestHandler(Message m, PeerNode source, long id, Node n,
short htl, Key key) {
+ req = m;
+ node = n;
+ uid = id;
+ this.source = source;
+ this.htl = htl;
+ if(htl <= 0)
+ htl = 1;
+ this.key = key;
+ if(key instanceof NodeSSK)
+ needsPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ receivedBytes(m.receivedByteCount());
+ }
+
+ public void run() {
+ freenet.support.Logger.OSThread.logPID(this);
+ try {
+ realRun();
+ //The last thing that realRun() does is register as a
request-sender listener, so any exception here is the end.
+ } catch(NotConnectedException e) {
+ Logger.normal(this, "requestor gone, could not start
request handler wait");
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false, false, false,
false);
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
+ node.unlockUID(uid, key instanceof NodeSSK, false,
false, false, false);
+ } catch(Throwable t) {
+ Logger.error(this, "Caught " + t, t);
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false, false, false,
false);
- }
- }
-
+ node.unlockUID(uid, key instanceof NodeSSK, false,
false, false, false);
+ }
+ }
private Exception previousApplyByteCountCall;
-
- private void applyByteCounts() {
- if(disconnected) {
- Logger.normal(this, "Not applying byte counts as request source
disconnected during receive");
- return;
- }
- if (appliedByteCounts) {
+
+ private void applyByteCounts() {
+ if(disconnected) {
+ Logger.normal(this, "Not applying byte counts as
request source disconnected during receive");
+ return;
+ }
+ if(appliedByteCounts) {
Logger.error(this, "applyByteCounts already called",
new Exception("error"));
Logger.error(this, "first called here",
previousApplyByteCountCall);
return;
}
- previousApplyByteCountCall=new Exception("first call to
applyByteCounts");
- appliedByteCounts=true;
- if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
- && status != RequestSender.INTERNAL_ERROR) {
- int sent, rcvd;
- synchronized(bytesSync) {
- sent = sentBytes;
- rcvd = receivedBytes;
- }
- sent += rs.getTotalSentBytes();
- rcvd += rs.getTotalReceivedBytes();
- if(key instanceof NodeSSK) {
- if(logMINOR) Logger.minor(this, "Remote SSK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-
node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
-
node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
- if(status == RequestSender.SUCCESS) {
- // Can report both parts, because we had both a
Handler and a Sender
-
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
-
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(rcvd);
- }
- } else {
- if(logMINOR) Logger.minor(this, "Remote CHK fetch cost
"+sent+ '/' +rcvd+" bytes ("+status+ ')');
-
node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
-
node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
- if(status == RequestSender.SUCCESS) {
- // Can report both parts, because we had both a
Handler and a Sender
-
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
-
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(rcvd);
- }
- }
- }
- }
+ previousApplyByteCountCall = new Exception("first call to
applyByteCounts");
+ appliedByteCounts = true;
+ if((!finalTransferFailed) && rs != null && status !=
RequestSender.TIMED_OUT && status != RequestSender.GENERATED_REJECTED_OVERLOAD
&& status != RequestSender.INTERNAL_ERROR) {
+ int sent, rcvd;
+ synchronized(bytesSync) {
+ sent = sentBytes;
+ rcvd = receivedBytes;
+ }
+ sent += rs.getTotalSentBytes();
+ rcvd += rs.getTotalReceivedBytes();
+ if(key instanceof NodeSSK) {
+ if(logMINOR)
+ Logger.minor(this, "Remote SSK fetch
cost " + sent + '/' + rcvd + " bytes (" + status + ')');
+
node.nodeStats.remoteSskFetchBytesSentAverage.report(sent);
+
node.nodeStats.remoteSskFetchBytesReceivedAverage.report(rcvd);
+ if(status == RequestSender.SUCCESS) {
+ // Can report both parts, because we
had both a Handler and a Sender
+
node.nodeStats.successfulSskFetchBytesSentAverage.report(sent);
+
node.nodeStats.successfulSskFetchBytesReceivedAverage.report(rcvd);
+ }
+ } else {
+ if(logMINOR)
+ Logger.minor(this, "Remote CHK fetch
cost " + sent + '/' + rcvd + " bytes (" + status + ')');
+
node.nodeStats.remoteChkFetchBytesSentAverage.report(sent);
+
node.nodeStats.remoteChkFetchBytesReceivedAverage.report(rcvd);
+ if(status == RequestSender.SUCCESS) {
+ // Can report both parts, because we
had both a Handler and a Sender
+
node.nodeStats.successfulChkFetchBytesSentAverage.report(sent);
+
node.nodeStats.successfulChkFetchBytesReceivedAverage.report(rcvd);
+ }
+ }
+ }
+ }
- private void realRun() throws NotConnectedException {
- if(logMINOR) Logger.minor(this, "Handling a request: "+uid);
-
- Message accepted = DMT.createFNPAccepted(uid);
- source.sendAsync(accepted, null, 0, this);
-
- Object o = node.makeRequestSender(key, htl, uid, source, false, true,
false, false);
- if(o instanceof KeyBlock) {
- returnLocalData((KeyBlock)o);
- return;
- }
-
- if(o == null) { // ran out of htl?
- Message dnf = DMT.createFNPDataNotFound(uid);
- status = RequestSender.DATA_NOT_FOUND; // for byte logging
- node.failureTable.onFinalFailure(key, null, htl,
FailureTable.REJECT_TIME, source);
- sendTerminal(dnf);
- return;
- } else {
- long queueTime = source.getProbableSendQueueTime();
- synchronized(this) {
- rs = (RequestSender) o;
- //If we cannot respond before this time, the 'source' node has
already fatally timed out (and we need not return packets which will not be
claimed)
- searchStartTime = System.currentTimeMillis();
- responseDeadline = searchStartTime +
RequestSender.FETCH_TIMEOUT + queueTime;
- }
- rs.addListener(this);
- }
+ private void realRun() throws NotConnectedException {
+ if(logMINOR)
+ Logger.minor(this, "Handling a request: " + uid);
+
+ Message accepted = DMT.createFNPAccepted(uid);
+ source.sendAsync(accepted, null, 0, this);
+
+ Object o = node.makeRequestSender(key, htl, uid, source, false,
true, false, false);
+ if(o instanceof KeyBlock) {
+ returnLocalData((KeyBlock) o);
+ return;
+ }
+
+ if(o == null) { // ran out of htl?
+ Message dnf = DMT.createFNPDataNotFound(uid);
+ status = RequestSender.DATA_NOT_FOUND; // for byte
logging
+ node.failureTable.onFinalFailure(key, null, htl,
FailureTable.REJECT_TIME, source);
+ sendTerminal(dnf);
+ return;
+ } else {
+ long queueTime = source.getProbableSendQueueTime();
+ synchronized(this) {
+ rs = (RequestSender) o;
+ //If we cannot respond before this time, the
'source' node has already fatally timed out (and we need not return packets
which will not be claimed)
+ searchStartTime = System.currentTimeMillis();
+ responseDeadline = searchStartTime +
RequestSender.FETCH_TIMEOUT + queueTime;
+ }
+ rs.addListener(this);
+ }
}
-
+
public void onReceivedRejectOverload() {
try {
- if(!sentRejectedOverload) {
- // Forward RejectedOverload
+ if(!sentRejectedOverload) {
+ // Forward RejectedOverload
//Note: This message is only decernable from
the terminal messages by the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
- Message msg = DMT.createFNPRejectedOverload(uid, false);
- source.sendAsync(msg, null, 0, this);
+ Message msg =
DMT.createFNPRejectedOverload(uid, false);
+ source.sendAsync(msg, null, 0, this);
//If the status changes (e.g. to SUCCESS),
there is little need to send yet another reject overload.
- sentRejectedOverload=true;
- }
- } catch (NotConnectedException e) {
+ sentRejectedOverload = true;
+ }
+ } catch(NotConnectedException e) {
Logger.normal(this, "requestor is gone, can't forward
reject overload");
}
}
-
private boolean disconnected = false;
-
+
public void onCHKTransferBegins() {
try {
- // Is a CHK.
- Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
- source.sendAsync(df, null, 0, this);
-
- PartiallyReceivedBlock prb = rs.getPRB();
- bt =
- new BlockTransmitter(node.usm, source, uid, prb, this);
- node.addTransferringRequestHandler(uid);
+ // Is a CHK.
+ Message df = DMT.createFNPCHKDataFound(uid,
rs.getHeaders());
+ source.sendAsync(df, null, 0, this);
+
+ PartiallyReceivedBlock prb = rs.getPRB();
+ bt =
+ new BlockTransmitter(node.usm, source, uid,
prb, this);
+ node.addTransferringRequestHandler(uid);
bt.sendAsync(node.executor);
- } catch (NotConnectedException e) {
+ } catch(NotConnectedException e) {
synchronized(this) {
disconnected = true;
}
Logger.normal(this, "requestor is gone, can't begin CHK
transfer");
}
}
-
+
private void waitAndFinishCHKTransferOffThread() {
node.executor.execute(new Runnable() {
+
public void run() {
try {
waitAndFinishCHKTransfer();
- } catch (NotConnectedException e) {
+ } catch(NotConnectedException e) {
//for byte logging, since the block is
the 'terminal' message.
applyByteCounts();
unregisterRequestHandlerWithNode();
}
}
- }, "Finish CHK transfer for "+key);
+ }, "Finish CHK transfer for " + key);
}
-
+
private void waitAndFinishCHKTransfer() throws NotConnectedException {
- if (logMINOR) Logger.minor(this, "Waiting for CHK transfer to
finish");
+ if(logMINOR)
+ Logger.minor(this, "Waiting for CHK transfer to
finish");
if(bt.getAsyncExitStatus()) {
status = rs.getStatus();
// Successful CHK transfer, maybe path fold
@@ -223,120 +226,115 @@
unregisterRequestHandlerWithNode();
}
}
-
+
public void onRequestSenderFinished(int status) {
long now = System.currentTimeMillis();
- this.status=status;
-
+ this.status = status;
+
boolean tooLate;
synchronized(this) {
tooLate = responseDeadline > 0 && now >
responseDeadline;
}
-
- if (tooLate) {
+
+ if(tooLate) {
// Offer the data if there is any.
node.failureTable.onFinalFailure(key, null, htl, -1,
source);
PeerNode routedLast = rs == null ? null :
rs.routedLast();
// A certain number of these are normal.
- Logger.normal(this, "requestsender took too long to
respond to requestor ("+TimeUtil.formatTime((now - searchStartTime), 2,
true)+"/"+(rs == null ? "null" : rs.getStatusString())+") routed to
"+(routedLast == null ? "null" : routedLast.shortToString()));
+ Logger.normal(this, "requestsender took too long to
respond to requestor (" + TimeUtil.formatTime((now - searchStartTime), 2, true)
+ "/" + (rs == null ? "null" : rs.getStatusString()) + ") routed to " +
(routedLast == null ? "null" : routedLast.shortToString()));
applyByteCounts();
unregisterRequestHandlerWithNode();
return;
}
-
+
if(status == RequestSender.NOT_FINISHED)
Logger.error(this, "onFinished() but not finished?");
-
+
try {
- switch(status) {
- case RequestSender.NOT_FINISHED:
- case RequestSender.DATA_NOT_FOUND:
- Message dnf = DMT.createFNPDataNotFound(uid);
- sendTerminal(dnf);
- return;
- case RequestSender.RECENTLY_FAILED:
- Message rf = DMT.createFNPRecentlyFailed(uid,
rs.getRecentlyFailedTimeLeft());
- sendTerminal(rf);
- return;
- case RequestSender.GENERATED_REJECTED_OVERLOAD:
- case RequestSender.TIMED_OUT:
- case RequestSender.INTERNAL_ERROR:
- // Locally generated.
- // Propagate back to source who needs to reduce send rate
+ switch(status) {
+ case RequestSender.NOT_FINISHED:
+ case RequestSender.DATA_NOT_FOUND:
+ Message dnf =
DMT.createFNPDataNotFound(uid);
+ sendTerminal(dnf);
+ return;
+ case RequestSender.RECENTLY_FAILED:
+ Message rf =
DMT.createFNPRecentlyFailed(uid, rs.getRecentlyFailedTimeLeft());
+ sendTerminal(rf);
+ return;
+ case RequestSender.GENERATED_REJECTED_OVERLOAD:
+ case RequestSender.TIMED_OUT:
+ case RequestSender.INTERNAL_ERROR:
+ // Locally generated.
+ // Propagate back to source who needs
to reduce send rate
///@bug: we may not want to translate
fatal timeouts into non-fatal timeouts.
- Message reject = DMT.createFNPRejectedOverload(uid, true);
- sendTerminal(reject);
- return;
- case RequestSender.ROUTE_NOT_FOUND:
- // Tell source
- Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
- sendTerminal(rnf);
- return;
- case RequestSender.SUCCESS:
- if(key instanceof NodeSSK) {
- sendSSK(rs.getHeaders(), rs.getSSKData(),
needsPubKey, ((NodeSSK)rs.getSSKBlock().getKey()).getPubKey());
- } else {
- if(bt == null && !disconnected) {
- // Bug! This is impossible!
- Logger.error(this, "Status is SUCCESS
but we never started a transfer on "+uid);
- // Obviously this node is confused,
send a terminal reject to make sure the requestor is not waiting forever.
- reject = DMT.createFNPRejectedOverload(uid, true);
- sendTerminal(reject);
- } else if(!disconnected) {
- waitAndFinishCHKTransferOffThread();
- }
- }
+ Message reject =
DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
return;
- case RequestSender.VERIFY_FAILURE:
- case RequestSender.GET_OFFER_VERIFY_FAILURE:
- if(key instanceof NodeCHK) {
+ case RequestSender.ROUTE_NOT_FOUND:
+ // Tell source
+ Message rnf =
DMT.createFNPRouteNotFound(uid, rs.getHTL());
+ sendTerminal(rnf);
+ return;
+ case RequestSender.SUCCESS:
+ if(key instanceof NodeSSK)
+ sendSSK(rs.getHeaders(),
rs.getSSKData(), needsPubKey, ((NodeSSK)
rs.getSSKBlock().getKey()).getPubKey());
+ else
if(bt == null && !disconnected)
{
- // Bug! This is impossible!
- Logger.error(this, "Status is
VERIFY_FAILURE but we never started a transfer on "+uid);
+ // Bug! This is
impossible!
+ Logger.error(this,
"Status is SUCCESS but we never started a transfer on " + uid);
// Obviously this node
is confused, send a terminal reject to make sure the requestor is not waiting
forever.
- reject = DMT.createFNPRejectedOverload(uid, true);
- sendTerminal(reject);
- } else if(!disconnected) {
+ reject =
DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ } else if(!disconnected)
+
waitAndFinishCHKTransferOffThread();
+ return;
+ case RequestSender.VERIFY_FAILURE:
+ case RequestSender.GET_OFFER_VERIFY_FAILURE:
+ if(key instanceof NodeCHK) {
+ if(bt == null && !disconnected)
{
+ // Bug! This is
impossible!
+ Logger.error(this,
"Status is VERIFY_FAILURE but we never started a transfer on " + uid);
+ // Obviously this node
is confused, send a terminal reject to make sure the requestor is not waiting
forever.
+ reject =
DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ } else if(!disconnected)
//Verify fails after
receive() is complete, so we might as well propagate it...
- waitAndFinishCHKTransferOffThread();
- }
+
waitAndFinishCHKTransferOffThread();
return;
- }
- reject = DMT.createFNPRejectedOverload(uid, true);
- sendTerminal(reject);
- return;
- case RequestSender.TRANSFER_FAILED:
- case RequestSender.GET_OFFER_TRANSFER_FAILED:
- if(key instanceof NodeCHK) {
- if(bt == null && !disconnected) {
- // Bug! This is impossible!
- Logger.error(this, "Status is
TRANSFER_FAILED but we never started a transfer on "+uid);
+ }
+ reject =
DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ return;
+ case RequestSender.TRANSFER_FAILED:
+ case RequestSender.GET_OFFER_TRANSFER_FAILED:
+ if(key instanceof NodeCHK) {
+ if(bt == null && !disconnected)
{
+ // Bug! This is
impossible!
+ Logger.error(this,
"Status is TRANSFER_FAILED but we never started a transfer on " + uid);
// Obviously this node
is confused, send a terminal reject to make sure the requestor is not waiting
forever.
- reject = DMT.createFNPRejectedOverload(uid, true);
- sendTerminal(reject);
- } else if(!disconnected) {
- waitAndFinishCHKTransferOffThread();
- }
+ reject =
DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ } else if(!disconnected)
+
waitAndFinishCHKTransferOffThread();
return;
- }
- Logger.error(this, "finish(TRANSFER_FAILED) should not
be called on SSK?!?!", new Exception("error"));
- return;
- default:
- // Treat as internal error
- reject = DMT.createFNPRejectedOverload(uid, true);
- sendTerminal(reject);
- throw new IllegalStateException("Unknown status code
"+status);
- }
- } catch (NotConnectedException e) {
+ }
+ Logger.error(this,
"finish(TRANSFER_FAILED) should not be called on SSK?!?!", new
Exception("error"));
+ return;
+ default:
+ // Treat as internal error
+ reject =
DMT.createFNPRejectedOverload(uid, true);
+ sendTerminal(reject);
+ throw new
IllegalStateException("Unknown status code " + status);
+ }
+ } catch(NotConnectedException e) {
Logger.normal(this, "requestor is gone, can't send
terminal message");
applyByteCounts();
unregisterRequestHandlerWithNode();
}
}
-
public static boolean SEND_OLD_FORMAT_SSK = false;
-
- private void sendSSK(byte[] headers, final byte[] data, boolean
needsPubKey2, DSAPublicKey pubKey) throws NotConnectedException {
+
+ private void sendSSK(byte[] headers, final byte[] data, boolean
needsPubKey2, DSAPublicKey pubKey) throws NotConnectedException {
// SUCCESS requires that BOTH the pubkey AND the data/headers
have been received.
// The pubKey will have been set on the SSK key, and the
SSKBlock will have been constructed.
Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid,
headers);
@@ -350,22 +348,21 @@
public void run() {
try {
- source.sendThrottledMessage(dataMsg,
data.length, RequestHandler.this, 60*1000, true);
+ source.sendThrottledMessage(dataMsg,
data.length, RequestHandler.this, 60 * 1000, true);
applyByteCounts();
- } catch (NotConnectedException e) {
+ } catch(NotConnectedException e) {
// Okay
- } catch (WaitedTooLongException e) {
+ } catch(WaitedTooLongException e) {
// Grrrr
- Logger.error(this, "Waited too long to
send SSK data on "+RequestHandler.this+" because of bwlimiting");
- } catch (SyncSendWaitedTooLongException e) {
- Logger.error(this, "Waited too long to
send SSK data on "+RequestHandler.this+" because of peer");
+ Logger.error(this, "Waited too long to
send SSK data on " + RequestHandler.this + " because of bwlimiting");
+ } catch(SyncSendWaitedTooLongException e) {
+ Logger.error(this, "Waited too long to
send SSK data on " + RequestHandler.this + " because of peer");
} finally {
unregisterRequestHandlerWithNode();
}
}
-
- }, "Send throttled SSK data for "+RequestHandler.this);
-
+ }, "Send throttled SSK data for " + RequestHandler.this);
+
if(SEND_OLD_FORMAT_SSK) {
Message df = DMT.createFNPSSKDataFound(uid, headers,
data);
source.sendAsync(df, null, 0, this);
@@ -378,19 +375,19 @@
}
}
- static void sendSSK(byte[] headers, byte[] data, boolean needsPubKey,
DSAPublicKey pubKey, final PeerNode source, long uid, ByteCounter ctr) throws
NotConnectedException, WaitedTooLongException {
+ static void sendSSK(byte[] headers, byte[] data, boolean needsPubKey,
DSAPublicKey pubKey, final PeerNode source, long uid, ByteCounter ctr) throws
NotConnectedException, WaitedTooLongException {
// SUCCESS requires that BOTH the pubkey AND the data/headers
have been received.
// The pubKey will have been set on the SSK key, and the
SSKBlock will have been constructed.
Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid,
headers);
source.sendAsync(headersMsg, null, 0, ctr);
final Message dataMsg = DMT.createFNPSSKDataFoundData(uid,
data);
try {
- source.sendThrottledMessage(dataMsg, data.length, ctr,
60*1000, false);
- } catch (SyncSendWaitedTooLongException e) {
+ source.sendThrottledMessage(dataMsg, data.length, ctr,
60 * 1000, false);
+ } catch(SyncSendWaitedTooLongException e) {
// Impossible
throw new Error(e);
}
-
+
if(SEND_OLD_FORMAT_SSK) {
Message df = DMT.createFNPSSKDataFound(uid, headers,
data);
source.sendAsync(df, null, 0, ctr);
@@ -401,248 +398,244 @@
Message pk = DMT.createFNPSSKPubKey(uid, pubKey);
source.sendAsync(pk, null, 0, ctr);
}
- }
-
+ }
+
/**
- * Return data from the datastore.
- * @param block The block we found in the datastore.
- * @throws NotConnectedException If we lose the connected to the request
source.
- */
- private void returnLocalData(KeyBlock block) throws NotConnectedException {
- if(key instanceof NodeSSK) {
- sendSSK(block.getRawHeaders(), block.getRawData(),
needsPubKey, ((SSKBlock)block).getPubKey());
- status = RequestSender.SUCCESS; // for byte logging
- } else if(block instanceof CHKBlock) {
- Message df = DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
- PartiallyReceivedBlock prb =
- new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
- BlockTransmitter bt =
- new BlockTransmitter(node.usm, source, uid, prb, this);
- node.addTransferringRequestHandler(uid);
+ * Return data from the datastore.
+ * @param block The block we found in the datastore.
+ * @throws NotConnectedException If we lose the connected to the
request source.
+ */
+ private void returnLocalData(KeyBlock block) throws
NotConnectedException {
+ if(key instanceof NodeSSK) {
+ sendSSK(block.getRawHeaders(), block.getRawData(),
needsPubKey, ((SSKBlock) block).getPubKey());
+ status = RequestSender.SUCCESS; // for byte logging
+ } else if(block instanceof CHKBlock) {
+ Message df = DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
+ PartiallyReceivedBlock prb =
+ new
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE,
block.getRawData());
+ BlockTransmitter bt =
+ new BlockTransmitter(node.usm, source, uid,
prb, this);
+ node.addTransferringRequestHandler(uid);
source.sendAsync(df, null, 0, this);
- if(bt.send(node.executor)) {
- // for byte logging
- status = RequestSender.SUCCESS;
- // We've fetched it from our datastore, so there won't
be a downstream noderef.
- // But we want to send at least an
FNPOpennetCompletedAck, otherwise the request source
- // may have to timeout waiting for one. That will be
the terminal message.
- finishOpennetNoRelay();
+ if(bt.send(node.executor)) {
+ // for byte logging
+ status = RequestSender.SUCCESS;
+ // We've fetched it from our datastore, so
there won't be a downstream noderef.
+ // But we want to send at least an
FNPOpennetCompletedAck, otherwise the request source
+ // may have to timeout waiting for one. That
will be the terminal message.
+ finishOpennetNoRelay();
} else {
- //also for byte logging, since the block is the 'terminal'
message.
- applyByteCounts();
+ //also for byte logging, since the block is the
'terminal' message.
+ applyByteCounts();
unregisterRequestHandlerWithNode();
- }
- } else throw new IllegalStateException();
+ }
+ } else
+ throw new IllegalStateException();
}
private void unregisterRequestHandlerWithNode() {
node.removeTransferringRequestHandler(uid);
node.unlockUID(uid, key instanceof NodeSSK, false, false,
false, false);
}
-
+
/**
- * Sends the 'final' packet of a request in such a way that the thread can
be freed (made non-runnable/exit)
- * and the byte counter will still be accurate.
- */
- private void sendTerminal(Message msg) throws NotConnectedException {
- if(logMINOR)
- Logger.minor(this, "sendTerminal("+msg+")", new
Exception("debug"));
- if (sendTerminalCalled)
- throw new IllegalStateException("sendTerminal should only be
called once");
- else
- sendTerminalCalled=true;
-
- source.sendAsync(msg, new TerminalMessageByteCountCollector(), 0,
this);
- }
-
- boolean sendTerminalCalled=false;
-
- /**
- * Note well! These functions are not executed on the RequestHandler
thread.
- */
- private class TerminalMessageByteCountCollector implements
AsyncMessageCallback {
-
- private boolean completed = false;
-
+ * Sends the 'final' packet of a request in such a way that the thread
can be freed (made non-runnable/exit)
+ * and the byte counter will still be accurate.
+ */
+ private void sendTerminal(Message msg) throws NotConnectedException {
+ if(logMINOR)
+ Logger.minor(this, "sendTerminal(" + msg + ")", new
Exception("debug"));
+ if(sendTerminalCalled)
+ throw new IllegalStateException("sendTerminal should
only be called once");
+ else
+ sendTerminalCalled = true;
+
+ source.sendAsync(msg, new TerminalMessageByteCountCollector(),
0, this);
+ }
+ boolean sendTerminalCalled = false;
+
+ /**
+ * Note well! These functions are not executed on the RequestHandler
thread.
+ */
+ private class TerminalMessageByteCountCollector implements
AsyncMessageCallback {
+
+ private boolean completed = false;
+
public void acknowledged() {
- //terminalMessage ack'd by remote peer
+ //terminalMessage ack'd by remote peer
complete();
}
-
+
public void disconnected() {
- Logger.minor(this, "Peer disconnected before terminal message sent
for "+RequestHandler.this);
- complete();
+ Logger.minor(this, "Peer disconnected before terminal
message sent for " + RequestHandler.this);
+ complete();
}
-
+
public void fatalError() {
Logger.error(this, "Error sending terminal message?!
for " + RequestHandler.this);
complete();
}
-
+
public void sent() {
complete();
- }
-
+ }
+
private void complete() {
synchronized(this) {
- if(completed) return;
+ if(completed)
+ return;
completed = true;
}
- //For byte counting, this relies on the fact that the callback
will only be excuted once.
+ //For byte counting, this relies on the fact that the
callback will only be excuted once.
applyByteCounts();
unregisterRequestHandlerWithNode();
}
}
-
- /**
- * Either send an ack, indicating we've finished and aren't interested in
opennet,
- * or wait for a noderef and relay it and wait for a response and relay
that,
- * or send our own noderef and wait for a response and add that.
- */
+
+ /**
+ * Either send an ack, indicating we've finished and aren't interested
in opennet,
+ * or wait for a noderef and relay it and wait for a response and relay
that,
+ * or send our own noderef and wait for a response and add that.
+ */
private void finishOpennetChecked() throws NotConnectedException {
OpennetManager om = node.getOpennet();
if(om != null &&
- (node.passOpennetRefsThroughDarknet() ||
source.isOpennet()) &&
- finishOpennetInner(om)) {
+ (node.passOpennetRefsThroughDarknet() ||
source.isOpennet()) &&
+ finishOpennetInner(om)) {
applyByteCounts();
unregisterRequestHandlerWithNode();
return;
}
-
+
Message msg = DMT.createFNPOpennetCompletedAck(uid);
sendTerminal(msg);
}
-
+
/**
* There is no noderef to pass downstream. If we want a connection,
send our
* noderef and wait for a reply, otherwise just send an ack.
*/
private void finishOpennetNoRelay() throws NotConnectedException {
OpennetManager om = node.getOpennet();
-
+
if(om != null && (source.isOpennet() ||
node.passOpennetRefsThroughDarknet()) &&
- finishOpennetNoRelayInner(om)) {
+ finishOpennetNoRelayInner(om)) {
applyByteCounts();
unregisterRequestHandlerWithNode();
return;
}
-
+
// Otherwise just ack it.
Message msg = DMT.createFNPOpennetCompletedAck(uid);
sendTerminal(msg);
}
-
+
private boolean finishOpennetInner(OpennetManager om) {
byte[] noderef = rs.waitForOpennetNoderef();
- if(noderef == null) {
+ if(noderef == null)
return finishOpennetNoRelayInner(om);
- }
-
- if(node.random.nextInt(OpennetManager.RESET_PATH_FOLDING_PROB)
== 0) {
+
+ if(node.random.nextInt(OpennetManager.RESET_PATH_FOLDING_PROB)
== 0)
return finishOpennetNoRelayInner(om);
- }
-
- finishOpennetRelay(noderef, om);
- return true;
- }
+ finishOpennetRelay(noderef, om);
+ return true;
+ }
+
/**
* Send our noderef to the request source, wait for a reply, if we get
one add it. Called when either the request
* wasn't routed, or the node it was routed to didn't return a noderef.
* @return True if success, or lost connection; false if we need to
send an ack.
*/
- private boolean finishOpennetNoRelayInner(OpennetManager om) {
- if(logMINOR)
- Logger.minor(this, "Finishing opennet: sending own reference");
- if(!om.wantPeer(null, false)) return false; // Don't want a
reference
-
+ private boolean finishOpennetNoRelayInner(OpennetManager om) {
+ if(logMINOR)
+ Logger.minor(this, "Finishing opennet: sending own
reference");
+ if(!om.wantPeer(null, false))
+ return false; // Don't want a reference
+
try {
om.sendOpennetRef(false, uid, source,
om.crypto.myCompressedFullRef(), this);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Can't send opennet ref because
node disconnected on "+this);
+ } catch(NotConnectedException e) {
+ Logger.normal(this, "Can't send opennet ref because
node disconnected on " + this);
// Oh well...
return true;
}
-
+
// Wait for response
-
- byte[] noderef =
- om.waitForOpennetNoderef(true, source, uid, this);
-
+
+ byte[] noderef = om.waitForOpennetNoderef(true, source, uid,
this);
+
if(noderef == null)
return false;
-
+
SimpleFieldSet ref = om.validateNoderef(noderef, 0,
noderef.length, source, false);
-
- if(ref == null)
+
+ if(ref == null)
return false;
-
- try {
- if(node.addNewOpennetNode(ref) == null) {
- Logger.normal(this, "Asked for opennet ref but
didn't want it for "+this+" :\n"+ref);
- } else {
- Logger.normal(this, "Added opennet noderef in
"+this);
- }
- } catch (FSParseException e) {
- Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+source, e);
- } catch (PeerParseException e) {
- Logger.error(this, "Could not parse opennet noderef for
"+this+" from "+source, e);
- } catch (ReferenceSignatureVerificationException e) {
- Logger.error(this, "Bad signature on opennet noderef
for "+this+" from "+source+" : "+e, e);
+
+ try {
+ if(node.addNewOpennetNode(ref) == null)
+ Logger.normal(this, "Asked for opennet ref but
didn't want it for " + this + " :\n" + ref);
+ else
+ Logger.normal(this, "Added opennet noderef in "
+ this);
+ } catch(FSParseException e) {
+ Logger.error(this, "Could not parse opennet noderef for
" + this + " from " + source, e);
+ } catch(PeerParseException e) {
+ Logger.error(this, "Could not parse opennet noderef for
" + this + " from " + source, e);
+ } catch(ReferenceSignatureVerificationException e) {
+ Logger.error(this, "Bad signature on opennet noderef
for " + this + " from " + source + " : " + e, e);
}
return true;
- }
+ }
- /**
- * Called when the node we routed the request to returned a valid noderef,
and we don't want it.
- * So we relay it downstream to somebody who does, and wait to relay the
response back upstream.
- * @param noderef
- * @param om
- */
+ /**
+ * Called when the node we routed the request to returned a valid
noderef, and we don't want it.
+ * So we relay it downstream to somebody who does, and wait to relay
the response back upstream.
+ * @param noderef
+ * @param om
+ */
private void finishOpennetRelay(byte[] noderef, OpennetManager om) {
- if(logMINOR)
- Logger.minor(this, "Finishing opennet: relaying reference from
"+rs.successFrom());
+ if(logMINOR)
+ Logger.minor(this, "Finishing opennet: relaying
reference from " + rs.successFrom());
// Send it back to the handler, then wait for the ConnectReply
PeerNode dataSource = rs.successFrom();
-
+
try {
om.sendOpennetRef(false, uid, source, noderef, this);
- } catch (NotConnectedException e) {
+ } catch(NotConnectedException e) {
// Lost contact with request source, nothing we can do
return;
}
-
+
// Now wait for reply from the request source.
-
+
byte[] newNoderef = om.waitForOpennetNoderef(true, source, uid,
this);
-
- if(newNoderef == null) {
+
+ if(newNoderef == null)
// Already sent a ref, no way to tell upstream that we
didn't receive one. :(
return;
- }
-
+
// Send it forward to the data source, if it is valid.
-
- if(om.validateNoderef(newNoderef, 0, newNoderef.length, source,
false) != null) {
+
+ if(om.validateNoderef(newNoderef, 0, newNoderef.length, source,
false) != null)
try {
om.sendOpennetRef(true, uid, dataSource,
newNoderef, this);
- } catch (NotConnectedException e) {
+ } catch(NotConnectedException e) {
// How sad
return;
}
- }
}
-
private int sentBytes;
private int receivedBytes;
private volatile Object bytesSync = new Object();
-
+
public void sentBytes(int x) {
synchronized(bytesSync) {
sentBytes += x;
}
node.nodeStats.requestSentBytes(key instanceof NodeSSK, x);
- if(logMINOR) Logger.minor(this, "sentBytes("+x+") on "+this);
+ if(logMINOR)
+ Logger.minor(this, "sentBytes(" + x + ") on " + this);
}
public void receivedBytes(int x) {
@@ -660,11 +653,11 @@
*/
node.sentPayload(x);
node.nodeStats.requestSentBytes(key instanceof NodeSSK, -x);
- if(logMINOR) Logger.minor(this, "sentPayload("+x+") on "+this);
+ if(logMINOR)
+ Logger.minor(this, "sentPayload(" + x + ") on " + this);
}
public int getPriority() {
return NativeThread.HIGH_PRIORITY;
}
-
}