Author: toad
Date: 2008-01-29 23:06:14 +0000 (Tue, 29 Jan 2008)
New Revision: 17405
Added:
trunk/freenet/src/freenet/node/CHKInsertHandler.java
Removed:
trunk/freenet/src/freenet/node/InsertHandler.java
Modified:
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
Log:
Rename InsertHandler -> CHKInsertHandler
Copied: trunk/freenet/src/freenet/node/CHKInsertHandler.java (from rev 17399,
trunk/freenet/src/freenet/node/InsertHandler.java)
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java
(rev 0)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-01-29
23:06:14 UTC (rev 17405)
@@ -0,0 +1,492 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.node;
+
+import freenet.io.comm.ByteCounter;
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.RetrievalException;
+import freenet.io.xfer.AbortedException;
+import freenet.io.xfer.BlockReceiver;
+import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.keys.CHKBlock;
+import freenet.keys.CHKVerifyException;
+import freenet.keys.NodeCHK;
+import freenet.support.HexUtil;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+import freenet.support.ShortBuffer;
+
+/**
+ * @author amphibian
+ *
+ * Handle an incoming insert request.
+ * This corresponds to RequestHandler.
+ */
+public class CHKInsertHandler implements Runnable, ByteCounter {
+
+
+ static final int DATA_INSERT_TIMEOUT = 10000;
+
+ final Message req;
+ final Node node;
+ final long uid;
+ final PeerNode source;
+ final NodeCHK key;
+ final long startTime;
+ private double closestLoc;
+ private short htl;
+ private CHKInsertSender sender;
+ private byte[] headers;
+ private BlockReceiver br;
+ private Thread runThread;
+ PartiallyReceivedBlock prb;
+ private static boolean logMINOR;
+
+ CHKInsertHandler(Message req, PeerNode source, long id, Node node, long
startTime) {
+ this.req = req;
+ this.node = node;
+ this.uid = id;
+ this.source = source;
+ this.startTime = startTime;
+ key = (NodeCHK) req.getObject(DMT.FREENET_ROUTING_KEY);
+ htl = req.getShort(DMT.HTL);
+ closestLoc = req.getDouble(DMT.NEAREST_LOCATION);
+ double targetLoc = key.toNormalizedDouble();
+ double myLoc = node.lm.getLocation();
+ if(Location.distance(targetLoc, myLoc) < Location.distance(targetLoc,
closestLoc)) {
+ closestLoc = myLoc;
+ htl = node.maxHTL();
+ }
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ receivedBytes(req.receivedByteCount());
+ }
+
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
+
+ public void run() {
+ freenet.support.Logger.OSThread.logPID(this);
+ try {
+ realRun();
+ } catch (OutOfMemoryError e) {
+ OOMHandler.handleOOM(e);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught in run() "+t, t);
+ } finally {
+ if(logMINOR) Logger.minor(this, "Exiting CHKInsertHandler.run()
for "+uid);
+ node.unlockUID(uid, false, true, false);
+ }
+ }
+
+ private void realRun() {
+ runThread = Thread.currentThread();
+
+ // FIXME implement rate limiting or something!
+ // Send Accepted
+ Message accepted = DMT.createFNPAccepted(uid);
+ try {
+ //Using sendSync here will help the next message filter
not timeout... wait here or at the message filter.
+ source.sendSync(accepted, this);
+ } catch (NotConnectedException e1) {
+ if(logMINOR) Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+
+ // Source will send us a DataInsert
+
+ MessageFilter mf;
+ mf =
MessageFilter.create().setType(DMT.FNPDataInsert).setField(DMT.UID,
uid).setSource(source).setTimeout(DATA_INSERT_TIMEOUT);
+
+ Message msg;
+ try {
+ msg = node.usm.waitFor(mf, this);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected while waiting for DataInsert on
"+uid);
+ return;
+ }
+
+ if(logMINOR) Logger.minor(this, "Received "+msg);
+
+ if(msg == null) {
+ try {
+ if(source.isConnected() && (startTime >
(source.timeLastConnectionCompleted()+Node.HANDSHAKE_TIMEOUT*4)))
+ Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
+ Message tooSlow = DMT.createFNPRejectedTimeout(uid);
+ source.sendAsync(tooSlow, null, 0, this);
+ Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
+ source.sendAsync(m, null, 0, this);
+ prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
+ br = new BlockReceiver(node.usm, source, uid, prb,
this);
+ prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
+ br.sendAborted(RetrievalException.NO_DATAINSERT, "No
DataInsert");
+ return;
+ } catch (NotConnectedException e) {
+ if(logMINOR) Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+ }
+
+ // We have a DataInsert
+ headers = ((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
+ // FIXME check the headers
+
+ // Now create an CHKInsertSender, or use an existing one, or
+ // discover that the data is in the store.
+
+ // From this point onwards, if we return cleanly we must go through
finish().
+
+ prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
+ if(htl > 0)
+ sender = node.makeInsertSender(key, htl, uid, source, headers,
prb, false, closestLoc, true);
+ br = new BlockReceiver(node.usm, source, uid, prb, this);
+
+ // Receive the data, off thread
+ Runnable dataReceiver = new DataReceiver();
+ receiveStarted = true;
+ node.executor.execute(dataReceiver, "CHKInsertHandler$DataReceiver for
UID "+uid);
+
+ if(htl == 0) {
+ canCommit = true;
+ msg = DMT.createFNPInsertReply(uid);
+ try {
+ source.sendSync(msg, this);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ finish(CHKInsertSender.SUCCESS);
+ return;
+ }
+
+ // Wait...
+ // What do we want to wait for?
+ // If the data receive completes, that's very nice,
+ // but doesn't really matter. What matters is what
+ // happens to the CHKInsertSender. If the data receive
+ // fails, that does matter...
+
+ // We are waiting for a terminal status on the CHKInsertSender,
+ // including REPLIED_WITH_DATA.
+ // If we get transfer failed, we can check whether the receive
+ // failed first. If it did it's not our fault.
+ // If the receive failed, and we haven't started transferring
+ // yet, we probably want to kill the sender.
+ // So we call the wait method on the CHKInsertSender, but we
+ // also have a flag locally to indicate the receive failed.
+ // And if it does, we interrupt.
+
+ boolean receivedRejectedOverload = false;
+
+ while(true) {
+ synchronized(sender) {
+ try {
+ if(sender.getStatus() == CHKInsertSender.NOT_FINISHED)
+ sender.wait(5000);
+ } catch (InterruptedException e) {
+ // Cool, probably this is because the receive failed...
+ }
+ }
+ if(receiveFailed()) {
+ // Nothing else we can do
+ finish(CHKInsertSender.RECEIVE_FAILED);
+ return;
+ }
+
+ if((!receivedRejectedOverload) &&
sender.receivedRejectedOverload()) {
+ receivedRejectedOverload = true;
+ // Forward it
+ Message m = DMT.createFNPRejectedOverload(uid, false);
+ try {
+ source.sendSync(m, this);
+ } catch (NotConnectedException e) {
+ if(logMINOR) Logger.minor(this, "Lost
connection to source");
+ return;
+ }
+ }
+
+ int status = sender.getStatus();
+
+ if(status == CHKInsertSender.NOT_FINISHED) {
+ continue;
+ }
+
+// // FIXME obviously! For debugging load issues.
+// if(node.myName.equalsIgnoreCase("Toad #1") &&
+// node.random.nextBoolean()) {
+// // Maliciously timeout
+// Logger.error(this, "Maliciously timing out: was
"+sender.getStatusString());
+// sentSuccess = true;
+// return;
+// }
+
+ // Local RejectedOverload's (fatal).
+ // Internal error counts as overload. It'd only create a timeout
otherwise, which is the same thing anyway.
+ // We *really* need a good way to deal with nodes that constantly
R_O!
+ if((status == CHKInsertSender.TIMED_OUT) ||
+ (status == CHKInsertSender.GENERATED_REJECTED_OVERLOAD)
||
+ (status == CHKInsertSender.INTERNAL_ERROR)) {
+ msg = DMT.createFNPRejectedOverload(uid, true);
+ try {
+ source.sendSync(msg, this);
+ } catch (NotConnectedException e) {
+ if(logMINOR) Logger.minor(this, "Lost
connection to source");
+ return;
+ }
+ // Might as well store it anyway.
+ if((status == CHKInsertSender.TIMED_OUT) ||
+ (status ==
CHKInsertSender.GENERATED_REJECTED_OVERLOAD))
+ canCommit = true;
+ finish(status);
+ return;
+ }
+
+ if((status == CHKInsertSender.ROUTE_NOT_FOUND) || (status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND)) {
+ msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
+ try {
+ source.sendSync(msg, this);
+ } catch (NotConnectedException e) {
+ if(logMINOR) Logger.minor(this, "Lost
connection to source");
+ return;
+ }
+ canCommit = true;
+ finish(status);
+ return;
+ }
+
+ if(status == CHKInsertSender.RECEIVE_FAILED) {
+ // Probably source's fault.
+ finish(status);
+ return;
+ }
+
+ if(status == CHKInsertSender.SUCCESS) {
+ msg = DMT.createFNPInsertReply(uid);
+ try {
+ source.sendSync(msg, this);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+ canCommit = true;
+ finish(status);
+ return;
+ }
+
+ // Otherwise...?
+ Logger.error(this, "Unknown status code:
"+sender.getStatusString());
+ msg = DMT.createFNPRejectedOverload(uid, true);
+ try {
+ source.sendSync(msg, this);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ finish(CHKInsertSender.INTERNAL_ERROR);
+ return;
+ }
+ }
+
+ private boolean canCommit = false;
+ private boolean sentCompletion = false;
+ private Object sentCompletionLock = new Object();
+
+ /**
+ * If canCommit, and we have received all the data, and it
+ * verifies, then commit it.
+ */
+ private void finish(int code) {
+ if(logMINOR) Logger.minor(this, "Waiting for receive");
+ synchronized(this) {
+ while(receiveStarted && !receiveCompleted) {
+ try {
+ wait(100*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+ maybeCommit();
+
+ if(logMINOR) Logger.minor(this, "Waiting for completion");
+ // Wait for completion
+ boolean sentCompletionWasSet;
+ synchronized(sentCompletionLock) {
+ sentCompletionWasSet = sentCompletion;
+ sentCompletion = true;
+ }
+
+ Message m=null;
+
+ if((sender != null) && (!sentCompletionWasSet)) {
+ //If there are downstream senders, our final success
report depends on there being no timeouts in the chain.
+ while(true) {
+ synchronized(sender) {
+ if(sender.completed()) {
+ break;
+ }
+ try {
+ sender.wait(10*1000);
+ } catch (InterruptedException e) {
+ // Loop
+ }
+ }
+ }
+ boolean failed = sender.anyTransfersFailed();
+ m = DMT.createFNPInsertTransfersCompleted(uid, failed);
+ }
+
+ if((sender == null) && (!sentCompletionWasSet) && (canCommit)) {
+ //There are no downstream senders, but we stored the
data locally, report successful transfer.
+ //Note that this is done even if the verify fails.
+ m = DMT.createFNPInsertTransfersCompleted(uid, false /*
no timeouts */);
+ }
+
+ try {
+ source.sendSync(m, this);
+ if(logMINOR) Logger.minor(this, "Sent completion: "+m+"
for "+this);
+ } catch (NotConnectedException e1) {
+ if(logMINOR) Logger.minor(this, "Not connected:
"+source+" for "+this);
+ // May need to commit anyway...
+ }
+
+ if(code != CHKInsertSender.TIMED_OUT && code !=
CHKInsertSender.GENERATED_REJECTED_OVERLOAD &&
+ code != CHKInsertSender.INTERNAL_ERROR && code !=
CHKInsertSender.ROUTE_REALLY_NOT_FOUND &&
+ code != CHKInsertSender.RECEIVE_FAILED &&
!receiveFailed()) {
+ int totalSent = getTotalSentBytes();
+ int totalReceived = getTotalReceivedBytes();
+ if(sender != null) {
+ totalSent += sender.getTotalSentBytes();
+ totalReceived += sender.getTotalReceivedBytes();
+ }
+ if(logMINOR) Logger.minor(this, "Remote CHK insert cost
"+totalSent+ '/' +totalReceived+" bytes ("+code+ ") receive failed =
"+receiveFailed());
+
node.nodeStats.remoteChkInsertBytesSentAverage.report(totalSent);
+
node.nodeStats.remoteChkInsertBytesReceivedAverage.report(totalReceived);
+ if(code == CHKInsertSender.SUCCESS) {
+ // Report both sent and received because we have both a
Handler and a Sender
+ if(sender != null && sender.startedSendingData())
+
node.nodeStats.successfulChkInsertBytesSentAverage.report(totalSent);
+
node.nodeStats.successfulChkInsertBytesReceivedAverage.report(totalReceived);
+ }
+ }
+ }
+
+ /**
+ * Verify data, or send DataInsertRejected.
+ */
+ private void maybeCommit() {
+ Message toSend = null;
+
+ synchronized(this) { // REDFLAG do not use synch(this) for any other
purpose!
+ if((prb == null) || prb.isAborted()) return;
+ try {
+ if(!canCommit) return;
+ if(!prb.allReceived()) return;
+ CHKBlock block = new CHKBlock(prb.getBlock(), headers, key);
+ node.store(block);
+ if(logMINOR) Logger.minor(this, "Committed");
+ } catch (CHKVerifyException e) {
+ Logger.error(this, "Verify failed in CHKInsertHandler: "+e+" -
headers: "+HexUtil.bytesToHex(headers), e);
+ toSend = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED);
+ } catch (AbortedException e) {
+ Logger.error(this, "Receive failed: "+e);
+ // Receiver thread (below) will handle sending the failure
notice
+ }
+ }
+ if(toSend != null) {
+ try {
+ source.sendAsync(toSend, null, 0, this);
+ } catch (NotConnectedException e) {
+ // :(
+ if(logMINOR) Logger.minor(this, "Lost connection in "+this+"
when sending FNPDataInsertRejected");
+ }
+ }
+ }
+
+ /** Has the receive failed? If so, there's not much more that can be
done... */
+ private boolean receiveFailed;
+
+ private boolean receiveStarted;
+ private boolean receiveCompleted;
+
+ public class DataReceiver implements Runnable {
+
+ public void run() {
+ freenet.support.Logger.OSThread.logPID(this);
+ if(logMINOR) Logger.minor(this, "Receiving data for
"+CHKInsertHandler.this);
+ try {
+ br.receive();
+ if(logMINOR) Logger.minor(this, "Received data for
"+CHKInsertHandler.this);
+ synchronized(CHKInsertHandler.this) {
+ receiveCompleted = true;
+ CHKInsertHandler.this.notifyAll();
+ }
+ } catch (RetrievalException e) {
+ synchronized(CHKInsertHandler.this) {
+ receiveCompleted = true;
+ receiveFailed = true;
+ CHKInsertHandler.this.notifyAll();
+ }
+ // Cancel the sender
+ if(sender != null)
+ sender.receiveFailed(); // tell it to stop if it hasn't
already failed... unless it's sending from store
+ runThread.interrupt();
+ Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED);
+ try {
+ source.sendSync(msg, CHKInsertHandler.this);
+ } catch (NotConnectedException ex) {
+ //If they are not connected, that's
probably why the receive failed!
+ if (logMINOR) Logger.minor(this, "Can't send "+msg+" to
"+source+": "+ex);
+ }
+ if
(e.getReason()==RetrievalException.SENDER_DISCONNECTED)
+ Logger.normal(this, "Failed to retrieve
(disconnect): "+e, e);
+ else
+ Logger.error(this, "Failed to retrieve
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
+ return;
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ }
+ }
+
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
+
+ }
+
+ private synchronized boolean receiveFailed() {
+ return receiveFailed;
+ }
+
+ private final Object totalSync = new Object();
+ private int totalSentBytes;
+ private int totalReceivedBytes;
+
+ public void sentBytes(int x) {
+ synchronized(totalSync) {
+ totalSentBytes += x;
+ }
+ }
+
+ public void receivedBytes(int x) {
+ synchronized(totalSync) {
+ totalReceivedBytes += x;
+ }
+ }
+
+ public int getTotalSentBytes() {
+ return totalSentBytes;
+ }
+
+ public int getTotalReceivedBytes() {
+ return totalReceivedBytes;
+ }
+
+ public void sentPayload(int x) {
+ node.sentPayload(x);
+ }
+}
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-01-29 23:01:32 UTC
(rev 17404)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-01-29 23:06:14 UTC
(rev 17405)
@@ -265,7 +265,7 @@
while(true) {
if(receiveFailed) {
- return; // don't need to set status as killed by InsertHandler
+ return; // don't need to set status as killed by
CHKInsertHandler
}
synchronized (this) {
@@ -338,7 +338,7 @@
sentRequest = true;
}
- if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
+ if(receiveFailed) return; // don't need to set status as killed by
CHKInsertHandler
Message msg = null;
/*
@@ -358,7 +358,7 @@
}
if (receiveFailed)
- return; // don't need to set status as
killed by InsertHandler
+ return; // don't need to set status as
killed by CHKInsertHandler
if (msg == null) {
// Terminal overload
@@ -658,7 +658,7 @@
}
/**
- * Called by InsertHandler to notify that the receive has
+ * Called by CHKInsertHandler to notify that the receive has
* failed.
*/
public void receiveFailed() {
Deleted: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2008-01-29 23:01:32 UTC
(rev 17404)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2008-01-29 23:06:14 UTC
(rev 17405)
@@ -1,492 +0,0 @@
-/* This code is part of Freenet. It is distributed under the GNU General
- * Public License, version 2 (or at your option any later version). See
- * http://www.gnu.org/ for further details of the GPL. */
-package freenet.node;
-
-import freenet.io.comm.ByteCounter;
-import freenet.io.comm.DMT;
-import freenet.io.comm.DisconnectedException;
-import freenet.io.comm.Message;
-import freenet.io.comm.MessageFilter;
-import freenet.io.comm.NotConnectedException;
-import freenet.io.comm.RetrievalException;
-import freenet.io.xfer.AbortedException;
-import freenet.io.xfer.BlockReceiver;
-import freenet.io.xfer.PartiallyReceivedBlock;
-import freenet.keys.CHKBlock;
-import freenet.keys.CHKVerifyException;
-import freenet.keys.NodeCHK;
-import freenet.support.HexUtil;
-import freenet.support.Logger;
-import freenet.support.OOMHandler;
-import freenet.support.ShortBuffer;
-
-/**
- * @author amphibian
- *
- * Handle an incoming insert request.
- * This corresponds to RequestHandler.
- */
-public class InsertHandler implements Runnable, ByteCounter {
-
-
- static final int DATA_INSERT_TIMEOUT = 10000;
-
- final Message req;
- final Node node;
- final long uid;
- final PeerNode source;
- final NodeCHK key;
- final long startTime;
- private double closestLoc;
- private short htl;
- private CHKInsertSender sender;
- private byte[] headers;
- private BlockReceiver br;
- private Thread runThread;
- PartiallyReceivedBlock prb;
- private static boolean logMINOR;
-
- InsertHandler(Message req, PeerNode source, long id, Node node, long
startTime) {
- this.req = req;
- this.node = node;
- this.uid = id;
- this.source = source;
- this.startTime = startTime;
- key = (NodeCHK) req.getObject(DMT.FREENET_ROUTING_KEY);
- htl = req.getShort(DMT.HTL);
- closestLoc = req.getDouble(DMT.NEAREST_LOCATION);
- double targetLoc = key.toNormalizedDouble();
- double myLoc = node.lm.getLocation();
- if(Location.distance(targetLoc, myLoc) < Location.distance(targetLoc,
closestLoc)) {
- closestLoc = myLoc;
- htl = node.maxHTL();
- }
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- receivedBytes(req.receivedByteCount());
- }
-
- public String toString() {
- return super.toString()+" for "+uid;
- }
-
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- try {
- realRun();
- } catch (OutOfMemoryError e) {
- OOMHandler.handleOOM(e);
- } catch (Throwable t) {
- Logger.error(this, "Caught in run() "+t, t);
- } finally {
- if(logMINOR) Logger.minor(this, "Exiting InsertHandler.run()
for "+uid);
- node.unlockUID(uid, false, true, false);
- }
- }
-
- private void realRun() {
- runThread = Thread.currentThread();
-
- // FIXME implement rate limiting or something!
- // Send Accepted
- Message accepted = DMT.createFNPAccepted(uid);
- try {
- //Using sendSync here will help the next message filter
not timeout... wait here or at the message filter.
- source.sendSync(accepted, this);
- } catch (NotConnectedException e1) {
- if(logMINOR) Logger.minor(this, "Lost connection to
source");
- return;
- }
-
- // Source will send us a DataInsert
-
- MessageFilter mf;
- mf =
MessageFilter.create().setType(DMT.FNPDataInsert).setField(DMT.UID,
uid).setSource(source).setTimeout(DATA_INSERT_TIMEOUT);
-
- Message msg;
- try {
- msg = node.usm.waitFor(mf, this);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected while waiting for DataInsert on
"+uid);
- return;
- }
-
- if(logMINOR) Logger.minor(this, "Received "+msg);
-
- if(msg == null) {
- try {
- if(source.isConnected() && (startTime >
(source.timeLastConnectionCompleted()+Node.HANDSHAKE_TIMEOUT*4)))
- Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
- Message tooSlow = DMT.createFNPRejectedTimeout(uid);
- source.sendAsync(tooSlow, null, 0, this);
- Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
- source.sendAsync(m, null, 0, this);
- prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
- br = new BlockReceiver(node.usm, source, uid, prb,
this);
- prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
- br.sendAborted(RetrievalException.NO_DATAINSERT, "No
DataInsert");
- return;
- } catch (NotConnectedException e) {
- if(logMINOR) Logger.minor(this, "Lost connection to
source");
- return;
- }
- }
-
- // We have a DataInsert
- headers = ((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
- // FIXME check the headers
-
- // Now create an CHKInsertSender, or use an existing one, or
- // discover that the data is in the store.
-
- // From this point onwards, if we return cleanly we must go through
finish().
-
- prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
- if(htl > 0)
- sender = node.makeInsertSender(key, htl, uid, source, headers,
prb, false, closestLoc, true);
- br = new BlockReceiver(node.usm, source, uid, prb, this);
-
- // Receive the data, off thread
- Runnable dataReceiver = new DataReceiver();
- receiveStarted = true;
- node.executor.execute(dataReceiver, "InsertHandler$DataReceiver for
UID "+uid);
-
- if(htl == 0) {
- canCommit = true;
- msg = DMT.createFNPInsertReply(uid);
- try {
- source.sendSync(msg, this);
- } catch (NotConnectedException e) {
- // Ignore
- }
- finish(CHKInsertSender.SUCCESS);
- return;
- }
-
- // Wait...
- // What do we want to wait for?
- // If the data receive completes, that's very nice,
- // but doesn't really matter. What matters is what
- // happens to the CHKInsertSender. If the data receive
- // fails, that does matter...
-
- // We are waiting for a terminal status on the CHKInsertSender,
- // including REPLIED_WITH_DATA.
- // If we get transfer failed, we can check whether the receive
- // failed first. If it did it's not our fault.
- // If the receive failed, and we haven't started transferring
- // yet, we probably want to kill the sender.
- // So we call the wait method on the CHKInsertSender, but we
- // also have a flag locally to indicate the receive failed.
- // And if it does, we interrupt.
-
- boolean receivedRejectedOverload = false;
-
- while(true) {
- synchronized(sender) {
- try {
- if(sender.getStatus() == CHKInsertSender.NOT_FINISHED)
- sender.wait(5000);
- } catch (InterruptedException e) {
- // Cool, probably this is because the receive failed...
- }
- }
- if(receiveFailed()) {
- // Nothing else we can do
- finish(CHKInsertSender.RECEIVE_FAILED);
- return;
- }
-
- if((!receivedRejectedOverload) &&
sender.receivedRejectedOverload()) {
- receivedRejectedOverload = true;
- // Forward it
- Message m = DMT.createFNPRejectedOverload(uid, false);
- try {
- source.sendSync(m, this);
- } catch (NotConnectedException e) {
- if(logMINOR) Logger.minor(this, "Lost
connection to source");
- return;
- }
- }
-
- int status = sender.getStatus();
-
- if(status == CHKInsertSender.NOT_FINISHED) {
- continue;
- }
-
-// // FIXME obviously! For debugging load issues.
-// if(node.myName.equalsIgnoreCase("Toad #1") &&
-// node.random.nextBoolean()) {
-// // Maliciously timeout
-// Logger.error(this, "Maliciously timing out: was
"+sender.getStatusString());
-// sentSuccess = true;
-// return;
-// }
-
- // Local RejectedOverload's (fatal).
- // Internal error counts as overload. It'd only create a timeout
otherwise, which is the same thing anyway.
- // We *really* need a good way to deal with nodes that constantly
R_O!
- if((status == CHKInsertSender.TIMED_OUT) ||
- (status == CHKInsertSender.GENERATED_REJECTED_OVERLOAD)
||
- (status == CHKInsertSender.INTERNAL_ERROR)) {
- msg = DMT.createFNPRejectedOverload(uid, true);
- try {
- source.sendSync(msg, this);
- } catch (NotConnectedException e) {
- if(logMINOR) Logger.minor(this, "Lost
connection to source");
- return;
- }
- // Might as well store it anyway.
- if((status == CHKInsertSender.TIMED_OUT) ||
- (status ==
CHKInsertSender.GENERATED_REJECTED_OVERLOAD))
- canCommit = true;
- finish(status);
- return;
- }
-
- if((status == CHKInsertSender.ROUTE_NOT_FOUND) || (status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND)) {
- msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
- try {
- source.sendSync(msg, this);
- } catch (NotConnectedException e) {
- if(logMINOR) Logger.minor(this, "Lost
connection to source");
- return;
- }
- canCommit = true;
- finish(status);
- return;
- }
-
- if(status == CHKInsertSender.RECEIVE_FAILED) {
- // Probably source's fault.
- finish(status);
- return;
- }
-
- if(status == CHKInsertSender.SUCCESS) {
- msg = DMT.createFNPInsertReply(uid);
- try {
- source.sendSync(msg, this);
- } catch (NotConnectedException e) {
- Logger.minor(this, "Lost connection to
source");
- return;
- }
- canCommit = true;
- finish(status);
- return;
- }
-
- // Otherwise...?
- Logger.error(this, "Unknown status code:
"+sender.getStatusString());
- msg = DMT.createFNPRejectedOverload(uid, true);
- try {
- source.sendSync(msg, this);
- } catch (NotConnectedException e) {
- // Ignore
- }
- finish(CHKInsertSender.INTERNAL_ERROR);
- return;
- }
- }
-
- private boolean canCommit = false;
- private boolean sentCompletion = false;
- private Object sentCompletionLock = new Object();
-
- /**
- * If canCommit, and we have received all the data, and it
- * verifies, then commit it.
- */
- private void finish(int code) {
- if(logMINOR) Logger.minor(this, "Waiting for receive");
- synchronized(this) {
- while(receiveStarted && !receiveCompleted) {
- try {
- wait(100*1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
-
- maybeCommit();
-
- if(logMINOR) Logger.minor(this, "Waiting for completion");
- // Wait for completion
- boolean sentCompletionWasSet;
- synchronized(sentCompletionLock) {
- sentCompletionWasSet = sentCompletion;
- sentCompletion = true;
- }
-
- Message m=null;
-
- if((sender != null) && (!sentCompletionWasSet)) {
- //If there are downstream senders, our final success
report depends on there being no timeouts in the chain.
- while(true) {
- synchronized(sender) {
- if(sender.completed()) {
- break;
- }
- try {
- sender.wait(10*1000);
- } catch (InterruptedException e) {
- // Loop
- }
- }
- }
- boolean failed = sender.anyTransfersFailed();
- m = DMT.createFNPInsertTransfersCompleted(uid, failed);
- }
-
- if((sender == null) && (!sentCompletionWasSet) && (canCommit)) {
- //There are no downstream senders, but we stored the
data locally, report successful transfer.
- //Note that this is done even if the verify fails.
- m = DMT.createFNPInsertTransfersCompleted(uid, false /*
no timeouts */);
- }
-
- try {
- source.sendSync(m, this);
- if(logMINOR) Logger.minor(this, "Sent completion: "+m+"
for "+this);
- } catch (NotConnectedException e1) {
- if(logMINOR) Logger.minor(this, "Not connected:
"+source+" for "+this);
- // May need to commit anyway...
- }
-
- if(code != CHKInsertSender.TIMED_OUT && code !=
CHKInsertSender.GENERATED_REJECTED_OVERLOAD &&
- code != CHKInsertSender.INTERNAL_ERROR && code !=
CHKInsertSender.ROUTE_REALLY_NOT_FOUND &&
- code != CHKInsertSender.RECEIVE_FAILED &&
!receiveFailed()) {
- int totalSent = getTotalSentBytes();
- int totalReceived = getTotalReceivedBytes();
- if(sender != null) {
- totalSent += sender.getTotalSentBytes();
- totalReceived += sender.getTotalReceivedBytes();
- }
- if(logMINOR) Logger.minor(this, "Remote CHK insert cost
"+totalSent+ '/' +totalReceived+" bytes ("+code+ ") receive failed =
"+receiveFailed());
-
node.nodeStats.remoteChkInsertBytesSentAverage.report(totalSent);
-
node.nodeStats.remoteChkInsertBytesReceivedAverage.report(totalReceived);
- if(code == CHKInsertSender.SUCCESS) {
- // Report both sent and received because we have both a
Handler and a Sender
- if(sender != null && sender.startedSendingData())
-
node.nodeStats.successfulChkInsertBytesSentAverage.report(totalSent);
-
node.nodeStats.successfulChkInsertBytesReceivedAverage.report(totalReceived);
- }
- }
- }
-
- /**
- * Verify data, or send DataInsertRejected.
- */
- private void maybeCommit() {
- Message toSend = null;
-
- synchronized(this) { // REDFLAG do not use synch(this) for any other
purpose!
- if((prb == null) || prb.isAborted()) return;
- try {
- if(!canCommit) return;
- if(!prb.allReceived()) return;
- CHKBlock block = new CHKBlock(prb.getBlock(), headers, key);
- node.store(block);
- if(logMINOR) Logger.minor(this, "Committed");
- } catch (CHKVerifyException e) {
- Logger.error(this, "Verify failed in InsertHandler: "+e+" -
headers: "+HexUtil.bytesToHex(headers), e);
- toSend = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED);
- } catch (AbortedException e) {
- Logger.error(this, "Receive failed: "+e);
- // Receiver thread (below) will handle sending the failure
notice
- }
- }
- if(toSend != null) {
- try {
- source.sendAsync(toSend, null, 0, this);
- } catch (NotConnectedException e) {
- // :(
- if(logMINOR) Logger.minor(this, "Lost connection in "+this+"
when sending FNPDataInsertRejected");
- }
- }
- }
-
- /** Has the receive failed? If so, there's not much more that can be
done... */
- private boolean receiveFailed;
-
- private boolean receiveStarted;
- private boolean receiveCompleted;
-
- public class DataReceiver implements Runnable {
-
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- if(logMINOR) Logger.minor(this, "Receiving data for
"+InsertHandler.this);
- try {
- br.receive();
- if(logMINOR) Logger.minor(this, "Received data for
"+InsertHandler.this);
- synchronized(InsertHandler.this) {
- receiveCompleted = true;
- InsertHandler.this.notifyAll();
- }
- } catch (RetrievalException e) {
- synchronized(InsertHandler.this) {
- receiveCompleted = true;
- receiveFailed = true;
- InsertHandler.this.notifyAll();
- }
- // Cancel the sender
- if(sender != null)
- sender.receiveFailed(); // tell it to stop if it hasn't
already failed... unless it's sending from store
- runThread.interrupt();
- Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED);
- try {
- source.sendSync(msg, InsertHandler.this);
- } catch (NotConnectedException ex) {
- //If they are not connected, that's
probably why the receive failed!
- if (logMINOR) Logger.minor(this, "Can't send "+msg+" to
"+source+": "+ex);
- }
- if
(e.getReason()==RetrievalException.SENDER_DISCONNECTED)
- Logger.normal(this, "Failed to retrieve
(disconnect): "+e, e);
- else
- Logger.error(this, "Failed to retrieve
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
- return;
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- }
- }
-
- public String toString() {
- return super.toString()+" for "+uid;
- }
-
- }
-
- private synchronized boolean receiveFailed() {
- return receiveFailed;
- }
-
- private final Object totalSync = new Object();
- private int totalSentBytes;
- private int totalReceivedBytes;
-
- public void sentBytes(int x) {
- synchronized(totalSync) {
- totalSentBytes += x;
- }
- }
-
- public void receivedBytes(int x) {
- synchronized(totalSync) {
- totalReceivedBytes += x;
- }
- }
-
- public int getTotalSentBytes() {
- return totalSentBytes;
- }
-
- public int getTotalReceivedBytes() {
- return totalReceivedBytes;
- }
-
- public void sentPayload(int x) {
- node.sentPayload(x);
- }
-}
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-01-29 23:01:32 UTC
(rev 17404)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-01-29 23:06:14 UTC
(rev 17405)
@@ -282,12 +282,12 @@
long now = System.currentTimeMillis();
if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
SSKInsertHandler rh = new SSKInsertHandler(m, source,
id, node, now);
- node.executor.execute(rh, "InsertHandler for "+id+" on
"+node.getDarknetPortNumber());
+ node.executor.execute(rh, "CHKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
} else {
- InsertHandler rh = new InsertHandler(m, source, id,
node, now);
- node.executor.execute(rh, "InsertHandler for "+id+" on
"+node.getDarknetPortNumber());
+ CHKInsertHandler rh = new CHKInsertHandler(m, source,
id, node, now);
+ node.executor.execute(rh, "CHKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
}
- if(logMINOR) Logger.minor(this, "Started InsertHandler for
"+id);
+ if(logMINOR) Logger.minor(this, "Started CHKInsertHandler for
"+id);
return true;
}