Author: toad
Date: 2009-01-15 20:25:08 +0000 (Thu, 15 Jan 2009)
New Revision: 25064
Modified:
trunk/freenet/src/freenet/io/comm/RetrievalException.java
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeClientCore.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/RequestTag.java
Log:
Turtling support. Not tested yet, probably horribly buggy.
Modified: trunk/freenet/src/freenet/io/comm/RetrievalException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/RetrievalException.java 2009-01-15
15:32:04 UTC (rev 25063)
+++ trunk/freenet/src/freenet/io/comm/RetrievalException.java 2009-01-15
20:25:08 UTC (rev 25064)
@@ -40,6 +40,8 @@
public static final int CANCELLED_BY_RECEIVER = 9;
public static final int RECEIVER_DIED = 11;
public static final int UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT = 12;
+ public static final int GONE_TO_TURTLE_MODE = 13;
+ public static final int TURTLE_KILLED = 14;
int _reason;
String _cause;
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2009-01-15
15:32:04 UTC (rev 25063)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2009-01-15
20:25:08 UTC (rev 25064)
@@ -68,6 +68,7 @@
private MessageFilter discardFilter;
private long discardEndTime;
private boolean tookTooLong;
+ private boolean senderAborted;
// private final boolean _doTooLong;
boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
@@ -131,6 +132,9 @@
if (desc.indexOf("Upstream")<0)
desc="Upstream transmit error: "+desc;
_prb.abort(m1.getInt(DMT.REASON), desc);
+ synchronized(this) {
+ senderAborted = true;
+ }
throw new
RetrievalException(m1.getInt(DMT.REASON), desc);
}
if ((m1 != null) &&
(m1.getSpec().equals(DMT.packetTransmit))) {
@@ -267,4 +271,7 @@
return tookTooLong;
}
+ public synchronized boolean senderAborted() {
+ return senderAborted;
+ }
}
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2009-01-15
15:32:04 UTC (rev 25063)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2009-01-15
20:25:08 UTC (rev 25064)
@@ -141,6 +141,14 @@
};
}
+ public void abortSend(int reason, String desc) throws
NotConnectedException {
+ synchronized(this) {
+ if(_sendComplete) return;
+ _sendComplete = true;
+ }
+ sendAborted(reason, desc);
+ }
+
public void sendAborted(int reason, String desc) throws
NotConnectedException {
_usm.send(_destination, DMT.createSendAborted(_uid, reason,
desc), _ctr);
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2009-01-15 15:32:04 UTC (rev
25063)
+++ trunk/freenet/src/freenet/node/Node.java 2009-01-15 20:25:08 UTC (rev
25064)
@@ -62,7 +62,9 @@
import freenet.io.comm.Peer;
import freenet.io.comm.PeerParseException;
import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.io.comm.RetrievalException;
import freenet.io.comm.UdpSocketHandler;
+import freenet.io.xfer.BlockReceiver;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
@@ -4059,4 +4061,113 @@
if(container == null) return NullLinkFixer.instance;
return container;
}
+
+ /**
+ * Make a running request sender into a turtle request.
+ * Backoff: when the transfer finishes, or after 10 seconds if no
cancellation.
+ * Downstream: Cancel all dependant RequestHandler's and local requests.
+ * This also removes it from the load management code.
+ * Registration: We track the turtles for each peer, and overall. No
two turtles from the
+ * same node may share the same key, and there is an overall limit.
+ * @param sender
+ */
+ public void makeTurtle(RequestSender sender) {
+ // Registration
+ // FIXME check the datastore.
+ if(!this.registerTurtleTransfer(sender)) {
+ // Too many turtles running, or already two turtles for
this key (we allow two in case one peer turtles as a DoS).
+ sender.killTurtle();
+ return;
+ }
+ PeerNode from = sender.transferringFrom();
+ if(!from.registerTurtleTransfer(sender)) {
+ // Too many turtles running, or already a turtle for
this key.
+ // Abort it.
+ unregisterTurtleTransfer(sender);
+ sender.killTurtle();
+ return;
+ }
+ // Do not transfer coalesce!!
+ synchronized(transferringRequestSenders) {
+ transferringRequestSenders.remove((NodeCHK)sender.key);
+ }
+
+ // Abort downstream transfers, set the turtle mode flag and set
up the backoff callback.
+ sender.setTurtle();
+ }
+
+ private static int MAX_TURTLES = 10;
+ private static int MAX_TURTLES_PER_KEY = 2;
+
+ private HashMap<Key,RequestSender[]> turtlingTransfers = new
HashMap<Key,RequestSender[]>();
+
+ private boolean registerTurtleTransfer(RequestSender sender) {
+ Key key = sender.key;
+ synchronized(turtlingTransfers) {
+ if(turtlingTransfers.size() >= MAX_TURTLES) return
false;
+ if(!turtlingTransfers.containsKey(key)) {
+ turtlingTransfers.put(key, new RequestSender[]
{ sender });
+ return true;
+ } else {
+ RequestSender[] senders =
turtlingTransfers.get(key);
+ if(senders.length >= MAX_TURTLES_PER_KEY)
return false;
+ for(int i=0;i<senders.length;i++) {
+ if(senders[i] == sender) {
+ Logger.error(this, "Registering
turtle for "+sender+" : "+key+" twice! (globally)");
+ return false;
+ }
+ }
+ RequestSender[] newSenders = new
RequestSender[senders.length+1];
+ System.arraycopy(senders, 0, newSenders, 0,
senders.length);
+ newSenders[senders.length] = sender;
+ turtlingTransfers.put(key, newSenders);
+ return true;
+ }
+ }
+ }
+
+ public void unregisterTurtleTransfer(RequestSender sender) {
+ Key key = sender.key;
+ synchronized(turtlingTransfers) {
+ if(!turtlingTransfers.containsKey(key)) {
+ Logger.error(this, "Removing turtle "+sender+"
for "+key+" : DOES NOT EXIST IN GLOBAL TURTLES LIST");
+ return;
+ }
+ RequestSender[] senders = turtlingTransfers.get(key);
+ if(senders.length == 1 && senders[0] == sender) {
+ turtlingTransfers.remove(key);
+ return;
+ }
+ if(senders.length == 2) {
+ if(senders[0] == sender) {
+ turtlingTransfers.put(key, new
RequestSender[] { senders[1] });
+ } else if(senders[1] == sender) {
+ turtlingTransfers.put(key, new
RequestSender[] { senders[0] });
+ }
+ return;
+ }
+ int x = 0;
+ for(int i=0;i<senders.length;i++) {
+ if(senders[i] == sender) x++;
+ }
+ if(x == 0) {
+ Logger.error(this, "Turtle not in global
register: "+sender+" for "+key);
+ return;
+ }
+ if(senders.length == x) {
+ Logger.error(this, "Lots of copies of turtle:
"+x);
+ turtlingTransfers.remove(key);
+ return;
+ }
+ RequestSender[] newSenders = new
RequestSender[senders.length - x];
+ int idx = 0;
+ for(RequestSender s : senders) {
+ if(s == sender) continue;
+ newSenders[idx++] = s;
+ }
+ turtlingTransfers.put(key, newSenders);
+ }
+ }
+
+
}
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2009-01-15 15:32:04 UTC
(rev 25063)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2009-01-15 20:25:08 UTC
(rev 25064)
@@ -556,6 +556,10 @@
if(listener != null)
listener.completed(status ==
RequestSender.SUCCESS);
}
+
+ public void onAbortDownstreamTransfers(int reason,
String desc) {
+ // Ignore, onRequestSenderFinished will also be
called.
+ }
}, tag);
}
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2009-01-15 15:32:04 UTC
(rev 25063)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2009-01-15 20:25:08 UTC
(rev 25064)
@@ -896,6 +896,10 @@
Logger.minor(this, "Skipping (not
connected): " + p.getPeer());
continue;
}
+ if(key != null && p.isTurtling(key)) {
+ if(logMINOR)
+ Logger.minor(this, "Skipping (already
turtling key): "+p.getPeer());
+ }
if(minVersion > 0 &&
Version.getArbitraryBuildNumber(p.getVersion(), -1) < minVersion) {
if(logMINOR)
Logger.minor(this, "Skipping old
version: " + p.getPeer());
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2009-01-15 15:32:04 UTC
(rev 25063)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2009-01-15 20:25:08 UTC
(rev 25064)
@@ -4266,4 +4266,40 @@
if(logMINOR) Logger.minor(this, "getReusableTrackerID():
"+cur.packets.trackerID+" on "+this);
return cur.packets.trackerID;
}
+
+ static final int MAX_TURTLES_PER_PEER = 3;
+
+ private HashMap<Key,RequestSender> turtlingTransfers = new
HashMap<Key,RequestSender>();
+
+ public boolean registerTurtleTransfer(RequestSender sender) {
+ Key key = sender.key;
+ synchronized(turtlingTransfers) {
+ if(turtlingTransfers.size() >= MAX_TURTLES_PER_PEER)
return false;
+ if(turtlingTransfers.containsKey(key)) return false;
+ turtlingTransfers.put(key, sender);
+ return true;
+ }
+ }
+
+ public void unregisterTurtleTransfer(RequestSender sender) {
+ Key key = sender.key;
+ synchronized(turtlingTransfers) {
+ if(!turtlingTransfers.containsKey(key)) {
+ Logger.error(this, "Removing turtle transfer
"+sender+" for "+key+" from "+this+" : DOES NOT EXIST");
+ return;
+ }
+ RequestSender oldSender = turtlingTransfers.remove(key);
+ if(oldSender != sender) {
+ Logger.error(this, "Removing turtle transfer
"+sender+" for "+key+" from "+this+" : WRONG SENDER: "+oldSender);
+ turtlingTransfers.put(key, oldSender);
+ return;
+ }
+ }
+ }
+
+ public boolean isTurtling(Key key) {
+ synchronized(turtlingTransfers) {
+ return turtlingTransfers.containsKey(key);
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2009-01-15 15:32:04 UTC
(rev 25063)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2009-01-15 20:25:08 UTC
(rev 25064)
@@ -201,6 +201,21 @@
Logger.normal(this, "requestor is gone, can't begin CHK
transfer");
}
}
+
+ public void onAbortDownstreamTransfers(int reason, String desc) {
+ if(bt == null) {
+ Logger.error(this, "No downstream transfer to abort! on
"+this);
+ return;
+ }
+ if(logMINOR)
+ Logger.minor(this, "Aborting downstream transfer on
"+this);
+ tag.onAbortDownstreamTransfers(reason, desc);
+ try {
+ bt.abortSend(reason, desc);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ }
private void waitAndFinishCHKTransferOffThread() {
node.executor.execute(new Runnable() {
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2009-01-15 15:32:04 UTC
(rev 25063)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2009-01-15 20:25:08 UTC
(rev 25064)
@@ -74,6 +74,9 @@
private byte[] sskData;
private SSKBlock block;
private boolean hasForwarded;
+ private PeerNode transferringFrom;
+ private boolean turtleMode;
+ private boolean sentBackoffTurtle;
/** If true, only try to fetch the key from nodes which have offered it */
private boolean tryOffersOnly;
@@ -775,12 +778,49 @@
try {
if(logMINOR) Logger.minor(this,
"Receiving data");
- byte[] data = br.receive();
+ final PeerNode from = next;
+ synchronized(this) {
+ transferringFrom = next;
+ }
+ node.getTicker().queueTimedJob(new
Runnable() {
+
+ public void
run() {
+
synchronized(RequestSender.this) {
+
if(transferringFrom != from) return;
+ }
+
node.makeTurtle(RequestSender.this);
+ }
+
+ }, 60*1000);
+ byte[] data;
+ try {
+ data = br.receive();
+ } finally {
+ synchronized(this) {
+ transferringFrom = null;
+ }
+ }
+
long tEnd = System.currentTimeMillis();
this.transferTime = tEnd - tStart;
- if(!br.tookTooLong())
+ boolean turtle;
+ boolean turtleBackedOff;
+ synchronized(this) {
+ turtle = turtleMode;
+ turtleBackedOff =
sentBackoffTurtle;
+ sentBackoffTurtle = true;
+ }
+ if(!turtle)
next.transferSuccess();
+ else {
+ if(!turtleBackedOff)
+
next.transferFailed("Turtled transfer");
+ }
next.successNotOverload();
+ if(turtle) {
+ next.unregisterTurtleTransfer(this);
+ node.unregisterTurtleTransfer(this);
+ }
if(logMINOR) Logger.minor(this,
"Received data");
// Received data
try {
@@ -802,8 +842,20 @@
next.localRejectedOverload("TransferFailedRequest"+e.getReason());
finish(TRANSFER_FAILED, next, false);
node.failureTable.onFinalFailure(key,
next, htl, FailureTable.REJECT_TIME, source);
- if(!br.tookTooLong())
-
next.transferFailed("RequestSenderTransferFailed");
+ if(!turtleMode) {
+ int reason = e.getReason();
+ if((!br.senderAborted()) &&
+ reason ==
RetrievalException.SENDER_DIED || reason == RetrievalException.RECEIVER_DIED ||
reason == RetrievalException.TIMED_OUT
+ || reason ==
RetrievalException.UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT) {
+ // Looks like a
timeout. Backoff.
+
next.transferFailed(e.getMessage());
+ } else {
+ // Quick failure (in
that we didn't have to timeout). Don't backoff.
+ // Treat as a DNF.
+
node.failureTable.onFinalFailure(key, next, htl, FailureTable.REJECT_TIME,
source);
+ }
+
+ }
return;
}
} finally {
@@ -1308,6 +1360,9 @@
void onCHKTransferBegins();
/** Should return quickly, allocate a thread if it needs to
block etc */
void onRequestSenderFinished(int status);
+ /** Abort downstream transfers (not necessarily upstream ones,
so not via the PRB).
+ * Should return quickly, allocate a thread if it needs to
block etc. */
+ void onAbortDownstreamTransfers(int reason, String desc);
}
public void addListener(Listener l) {
@@ -1316,10 +1371,13 @@
boolean reject=false;
boolean transfer=false;
boolean sentFinished;
+ boolean sentTransferCancel = false;
int status;
synchronized (this) {
synchronized (listeners) {
- listeners.add(l);
+ sentTransferCancel =
sentAbortDownstreamTransfers;
+ if(!sentTransferCancel)
+ listeners.add(l);
reject = sentReceivedRejectOverload;
transfer = sentCHKTransferBegins;
sentFinished = sentRequestSenderFinished;
@@ -1332,6 +1390,8 @@
l.onReceivedRejectOverload();
if (transfer)
l.onCHKTransferBegins();
+ if(sentTransferCancel)
+
l.onAbortDownstreamTransfers(abortDownstreamTransfersReason,
abortDownstreamTransfersDesc);
if (status!=NOT_FINISHED && sentFinished)
l.onRequestSenderFinished(status);
}
@@ -1382,7 +1442,56 @@
}
}
+ private boolean sentAbortDownstreamTransfers;
+ private int abortDownstreamTransfersReason;
+ private String abortDownstreamTransfersDesc;
+
+ private void sendAbortDownstreamTransfers(int reason, String desc) {
+ synchronized (listeners) {
+ abortDownstreamTransfersReason = reason;
+ abortDownstreamTransfersDesc = desc;
+ sentAbortDownstreamTransfers = true;
+ for (Listener l : listeners) {
+ try {
+ l.onAbortDownstreamTransfers(reason,
desc);
+
l.onRequestSenderFinished(TRANSFER_FAILED);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught: "+t, t);
+ }
+ }
+ listeners.clear();
+ }
+ }
+
public int getPriority() {
return NativeThread.HIGH_PRIORITY;
}
+
+ public void setTurtle() {
+ synchronized(this) {
+ this.turtleMode = true;
+ }
+
sendAbortDownstreamTransfers(RetrievalException.GONE_TO_TURTLE_MODE,
"Turtling");
+ node.getTicker().queueTimedJob(new Runnable() {
+
+ public void run() {
+ synchronized(RequestSender.this) {
+ if(sentBackoffTurtle) return;
+ sentBackoffTurtle = true;
+ }
+ transferringFrom.transferFailed("Turtled
transfer");
+ }
+
+ }, 30*1000);
+ }
+
+ public PeerNode transferringFrom() {
+ return transferringFrom;
+ }
+
+ public void killTurtle() {
+ prb.abort(RetrievalException.TURTLE_KILLED, "Too many turtles /
already have turtles for this key");
+ node.failureTable.onFinalFailure(key, transferringFrom(), htl,
FailureTable.REJECT_TIME, source);
+ }
+
}
Modified: trunk/freenet/src/freenet/node/RequestTag.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestTag.java 2009-01-15 15:32:04 UTC
(rev 25063)
+++ trunk/freenet/src/freenet/node/RequestTag.java 2009-01-15 20:25:08 UTC
(rev 25064)
@@ -25,6 +25,9 @@
int requestSenderFinishedCode;
Throwable handlerThrew;
boolean rejected;
+ boolean abortedDownstreamTransfer;
+ int abortedDownstreamReason;
+ String abortedDownstreamDesc;
public RequestTag(boolean isSSK, START start) {
super();
@@ -73,10 +76,22 @@
sb.append(" finishedCode=").append(requestSenderFinishedCode);
sb.append(" rejected=").append(rejected);
sb.append(" thrown=").append(handlerThrew);
+ if(abortedDownstreamTransfer) {
+ sb.append(" abortedDownstreamTransfer reason=");
+ sb.append(abortedDownstreamReason);
+ sb.append(" desc=");
+ sb.append(abortedDownstreamDesc);
+ }
if(handlerThrew != null)
Logger.error(this, sb.toString(), handlerThrew);
else
Logger.error(this, sb.toString());
}
+ public void onAbortDownstreamTransfers(int reason, String desc) {
+ abortedDownstreamTransfer = true;
+ abortedDownstreamReason = reason;
+ abortedDownstreamDesc = desc;
+ }
+
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs