Author: toad
Date: 2009-01-15 20:25:08 +0000 (Thu, 15 Jan 2009)
New Revision: 25064

Modified:
   trunk/freenet/src/freenet/io/comm/RetrievalException.java
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeClientCore.java
   trunk/freenet/src/freenet/node/PeerManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/RequestTag.java
Log:
Turtling support. Not tested yet, probably horribly buggy.


Modified: trunk/freenet/src/freenet/io/comm/RetrievalException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/RetrievalException.java   2009-01-15 
15:32:04 UTC (rev 25063)
+++ trunk/freenet/src/freenet/io/comm/RetrievalException.java   2009-01-15 
20:25:08 UTC (rev 25064)
@@ -40,6 +40,8 @@
     public static final int CANCELLED_BY_RECEIVER = 9;
        public static final int RECEIVER_DIED = 11;
        public static final int UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT = 12;
+       public static final int GONE_TO_TURTLE_MODE = 13;
+       public static final int TURTLE_KILLED = 14;
        
        int _reason;
        String _cause;

Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2009-01-15 
15:32:04 UTC (rev 25063)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2009-01-15 
20:25:08 UTC (rev 25064)
@@ -68,6 +68,7 @@
        private MessageFilter discardFilter;
        private long discardEndTime;
        private boolean tookTooLong;
+       private boolean senderAborted;
 //     private final boolean _doTooLong;
 
        boolean logMINOR=Logger.shouldLog(Logger.MINOR, this);
@@ -131,6 +132,9 @@
                                if (desc.indexOf("Upstream")<0)
                                        desc="Upstream transmit error: "+desc;
                                _prb.abort(m1.getInt(DMT.REASON), desc);
+                               synchronized(this) {
+                                       senderAborted = true;
+                               }
                                throw new 
RetrievalException(m1.getInt(DMT.REASON), desc);
                        }
                        if ((m1 != null) && 
(m1.getSpec().equals(DMT.packetTransmit))) {
@@ -267,4 +271,7 @@
                return tookTooLong;
        }
        
+       public synchronized boolean senderAborted() {
+               return senderAborted;
+       }
 }

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2009-01-15 
15:32:04 UTC (rev 25063)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2009-01-15 
20:25:08 UTC (rev 25064)
@@ -141,6 +141,14 @@
                };
        }
 
+       public void abortSend(int reason, String desc) throws 
NotConnectedException {
+               synchronized(this) {
+                       if(_sendComplete) return;
+                       _sendComplete = true;
+               }
+               sendAborted(reason, desc);
+       }
+       
        public void sendAborted(int reason, String desc) throws 
NotConnectedException {
                _usm.send(_destination, DMT.createSendAborted(_uid, reason, 
desc), _ctr);
        }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2009-01-15 15:32:04 UTC (rev 
25063)
+++ trunk/freenet/src/freenet/node/Node.java    2009-01-15 20:25:08 UTC (rev 
25064)
@@ -62,7 +62,9 @@
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.io.comm.RetrievalException;
 import freenet.io.comm.UdpSocketHandler;
+import freenet.io.xfer.BlockReceiver;
 import freenet.io.xfer.PartiallyReceivedBlock;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKVerifyException;
@@ -4059,4 +4061,113 @@
                if(container == null) return NullLinkFixer.instance;
                return container;
        }
+
+       /**
+        * Make a running request sender into a turtle request.
+        * Backoff: when the transfer finishes, or after 10 seconds if no 
cancellation.
+        * Downstream: Cancel all dependant RequestHandler's and local requests.
+        * This also removes it from the load management code.
+        * Registration: We track the turtles for each peer, and overall. No 
two turtles from the
+        * same node may share the same key, and there is an overall limit.
+        * @param sender
+        */
+       public void makeTurtle(RequestSender sender) {
+               // Registration
+               // FIXME check the datastore.
+               if(!this.registerTurtleTransfer(sender)) {
+                       // Too many turtles running, or already two turtles for 
this key (we allow two in case one peer turtles as a DoS).
+                       sender.killTurtle();
+                       return;
+               }
+               PeerNode from = sender.transferringFrom();
+               if(!from.registerTurtleTransfer(sender)) {
+                       // Too many turtles running, or already a turtle for 
this key.
+                       // Abort it.
+                       unregisterTurtleTransfer(sender);
+                       sender.killTurtle();
+                       return;
+               }
+               // Do not transfer coalesce!!
+               synchronized(transferringRequestSenders) {
+                       transferringRequestSenders.remove((NodeCHK)sender.key);
+               }
+               
+               // Abort downstream transfers, set the turtle mode flag and set 
up the backoff callback.
+               sender.setTurtle();
+       }
+
+       private static int MAX_TURTLES = 10;
+       private static int MAX_TURTLES_PER_KEY = 2;
+       
+       private HashMap<Key,RequestSender[]> turtlingTransfers = new 
HashMap<Key,RequestSender[]>();
+       
+       private boolean registerTurtleTransfer(RequestSender sender) {
+               Key key = sender.key;
+               synchronized(turtlingTransfers) {
+                       if(turtlingTransfers.size() >= MAX_TURTLES) return 
false;
+                       if(!turtlingTransfers.containsKey(key)) {
+                               turtlingTransfers.put(key, new RequestSender[] 
{ sender });
+                               return true;
+                       } else {
+                               RequestSender[] senders = 
turtlingTransfers.get(key);
+                               if(senders.length >= MAX_TURTLES_PER_KEY) 
return false;
+                               for(int i=0;i<senders.length;i++) {
+                                       if(senders[i] == sender) {
+                                               Logger.error(this, "Registering 
turtle for "+sender+" : "+key+" twice! (globally)");
+                                               return false;
+                                       }
+                               }
+                               RequestSender[] newSenders = new 
RequestSender[senders.length+1];
+                               System.arraycopy(senders, 0, newSenders, 0, 
senders.length);
+                               newSenders[senders.length] = sender;
+                               turtlingTransfers.put(key, newSenders);
+                               return true;
+                       }
+               }
+       }
+       
+       public void unregisterTurtleTransfer(RequestSender sender) {
+               Key key = sender.key;
+               synchronized(turtlingTransfers) {
+                       if(!turtlingTransfers.containsKey(key)) {
+                               Logger.error(this, "Removing turtle "+sender+" 
for "+key+" : DOES NOT EXIST IN GLOBAL TURTLES LIST");
+                               return;
+                       }
+                       RequestSender[] senders = turtlingTransfers.get(key);
+                       if(senders.length == 1 && senders[0] == sender) {
+                               turtlingTransfers.remove(key);
+                               return;
+                       }
+                       if(senders.length == 2) {
+                               if(senders[0] == sender) {
+                                       turtlingTransfers.put(key, new 
RequestSender[] { senders[1] });
+                               } else if(senders[1] == sender) {
+                                       turtlingTransfers.put(key, new 
RequestSender[] { senders[0] });
+                               }
+                               return;
+                       }
+                       int x = 0;
+                       for(int i=0;i<senders.length;i++) {
+                               if(senders[i] == sender) x++;
+                       }
+                       if(x == 0) {
+                               Logger.error(this, "Turtle not in global 
register: "+sender+" for "+key);
+                               return;
+                       }
+                       if(senders.length == x) {
+                               Logger.error(this, "Lots of copies of turtle: 
"+x);
+                               turtlingTransfers.remove(key);
+                               return;
+                       }
+                       RequestSender[] newSenders = new 
RequestSender[senders.length - x];
+                       int idx = 0;
+                       for(RequestSender s : senders) {
+                               if(s == sender) continue;
+                               newSenders[idx++] = s;
+                       }
+                       turtlingTransfers.put(key, newSenders);
+               }
+       }
+
+
 }

Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java  2009-01-15 15:32:04 UTC 
(rev 25063)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java  2009-01-15 20:25:08 UTC 
(rev 25064)
@@ -556,6 +556,10 @@
                                if(listener != null)
                                        listener.completed(status == 
RequestSender.SUCCESS);
                        }
+
+                       public void onAbortDownstreamTransfers(int reason, 
String desc) {
+                               // Ignore, onRequestSenderFinished will also be 
called.
+                       }
                }, tag);
        }
 

Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java     2009-01-15 15:32:04 UTC 
(rev 25063)
+++ trunk/freenet/src/freenet/node/PeerManager.java     2009-01-15 20:25:08 UTC 
(rev 25064)
@@ -896,6 +896,10 @@
                                        Logger.minor(this, "Skipping (not 
connected): " + p.getPeer());
                                continue;
                        }
+                       if(key != null && p.isTurtling(key)) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Skipping (already 
turtling key): "+p.getPeer());
+                       }
                        if(minVersion > 0 && 
Version.getArbitraryBuildNumber(p.getVersion(), -1) < minVersion) {
                                if(logMINOR)
                                        Logger.minor(this, "Skipping old 
version: " + p.getPeer());

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2009-01-15 15:32:04 UTC 
(rev 25063)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2009-01-15 20:25:08 UTC 
(rev 25064)
@@ -4266,4 +4266,40 @@
                if(logMINOR) Logger.minor(this, "getReusableTrackerID(): 
"+cur.packets.trackerID+" on "+this);
                return cur.packets.trackerID;
        }
+
+       static final int MAX_TURTLES_PER_PEER = 3;
+       
+       private HashMap<Key,RequestSender> turtlingTransfers = new 
HashMap<Key,RequestSender>();
+       
+       public boolean registerTurtleTransfer(RequestSender sender) {
+               Key key = sender.key;
+               synchronized(turtlingTransfers) {
+                       if(turtlingTransfers.size() >= MAX_TURTLES_PER_PEER) 
return false;
+                       if(turtlingTransfers.containsKey(key)) return false;
+                       turtlingTransfers.put(key, sender);
+                       return true;
+               }
+       }
+
+       public void unregisterTurtleTransfer(RequestSender sender) {
+               Key key = sender.key;
+               synchronized(turtlingTransfers) {
+                       if(!turtlingTransfers.containsKey(key)) {
+                               Logger.error(this, "Removing turtle transfer 
"+sender+" for "+key+" from "+this+" : DOES NOT EXIST");
+                               return;
+                       }
+                       RequestSender oldSender = turtlingTransfers.remove(key);
+                       if(oldSender != sender) {
+                               Logger.error(this, "Removing turtle transfer 
"+sender+" for "+key+" from "+this+" : WRONG SENDER: "+oldSender);
+                               turtlingTransfers.put(key, oldSender);
+                               return;
+                       }
+               }
+       }
+
+       public boolean isTurtling(Key key) {
+               synchronized(turtlingTransfers) {
+                       return turtlingTransfers.containsKey(key);
+               }
+       }
 }

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2009-01-15 15:32:04 UTC 
(rev 25063)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2009-01-15 20:25:08 UTC 
(rev 25064)
@@ -201,6 +201,21 @@
                        Logger.normal(this, "requestor is gone, can't begin CHK 
transfer");
                }
        }
+       
+       public void onAbortDownstreamTransfers(int reason, String desc) {
+               if(bt == null) {
+                       Logger.error(this, "No downstream transfer to abort! on 
"+this);
+                       return;
+               }
+               if(logMINOR)
+                       Logger.minor(this, "Aborting downstream transfer on 
"+this);
+               tag.onAbortDownstreamTransfers(reason, desc);
+               try {
+                       bt.abortSend(reason, desc);
+               } catch (NotConnectedException e) {
+                       // Ignore
+               }
+       }
 
        private void waitAndFinishCHKTransferOffThread() {
                node.executor.execute(new Runnable() {

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2009-01-15 15:32:04 UTC 
(rev 25063)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2009-01-15 20:25:08 UTC 
(rev 25064)
@@ -74,6 +74,9 @@
     private byte[] sskData;
     private SSKBlock block;
     private boolean hasForwarded;
+    private PeerNode transferringFrom;
+    private boolean turtleMode;
+    private boolean sentBackoffTurtle;
     
     /** If true, only try to fetch the key from nodes which have offered it */
     private boolean tryOffersOnly;
@@ -775,12 +778,49 @@
                                
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Receiving data");
-                                       byte[] data = br.receive();
+                                       final PeerNode from = next;
+                                       synchronized(this) {
+                                               transferringFrom = next;
+                                       }
+                                       node.getTicker().queueTimedJob(new 
Runnable() {
+
+                                                               public void 
run() {
+                                                                       
synchronized(RequestSender.this) {
+                                                                               
if(transferringFrom != from) return;
+                                                                       }
+                                                                       
node.makeTurtle(RequestSender.this);
+                                                               }
+                                               
+                                       }, 60*1000);
+                                       byte[] data;
+                                       try {
+                                               data = br.receive();
+                                       } finally {
+                                               synchronized(this) {
+                                                       transferringFrom = null;
+                                               }
+                                       }
+                                       
                                        long tEnd = System.currentTimeMillis();
                                        this.transferTime = tEnd - tStart;
-                                       if(!br.tookTooLong())
+                                       boolean turtle;
+                                       boolean turtleBackedOff;
+                                       synchronized(this) {
+                                               turtle = turtleMode;
+                                               turtleBackedOff = 
sentBackoffTurtle;
+                                               sentBackoffTurtle = true;
+                                       }
+                                       if(!turtle)
                                                next.transferSuccess();
+                                       else {
+                                               if(!turtleBackedOff)
+                                                       
next.transferFailed("Turtled transfer");
+                                       }
                                next.successNotOverload();
+                               if(turtle) {
+                                       next.unregisterTurtleTransfer(this);
+                                       node.unregisterTurtleTransfer(this);
+                               }
                                        if(logMINOR) Logger.minor(this, 
"Received data");
                                        // Received data
                                        try {
@@ -802,8 +842,20 @@
                                                        
next.localRejectedOverload("TransferFailedRequest"+e.getReason());
                                        finish(TRANSFER_FAILED, next, false);
                                        node.failureTable.onFinalFailure(key, 
next, htl, FailureTable.REJECT_TIME, source);
-                                       if(!br.tookTooLong())
-                                               
next.transferFailed("RequestSenderTransferFailed");
+                                       if(!turtleMode) {
+                                               int reason = e.getReason();
+                                               if((!br.senderAborted()) &&
+                                                               reason == 
RetrievalException.SENDER_DIED || reason == RetrievalException.RECEIVER_DIED || 
reason == RetrievalException.TIMED_OUT
+                                                               || reason == 
RetrievalException.UNABLE_TO_SEND_BLOCK_WITHIN_TIMEOUT) {
+                                                       // Looks like a 
timeout. Backoff.
+                                                       
next.transferFailed(e.getMessage());
+                                               } else {
+                                                       // Quick failure (in 
that we didn't have to timeout). Don't backoff.
+                                                       // Treat as a DNF.
+                                                       
node.failureTable.onFinalFailure(key, next, htl, FailureTable.REJECT_TIME, 
source);
+                                               }
+                                                       
+                                       }
                                        return;
                                }
                        } finally {
@@ -1308,6 +1360,9 @@
                void onCHKTransferBegins();
                /** Should return quickly, allocate a thread if it needs to 
block etc */
                void onRequestSenderFinished(int status);
+               /** Abort downstream transfers (not necessarily upstream ones, 
so not via the PRB).
+                * Should return quickly, allocate a thread if it needs to 
block etc. */
+               void onAbortDownstreamTransfers(int reason, String desc);
        }
        
        public void addListener(Listener l) {
@@ -1316,10 +1371,13 @@
                boolean reject=false;
                boolean transfer=false;
                boolean sentFinished;
+               boolean sentTransferCancel = false;
                int status;
                synchronized (this) {
                        synchronized (listeners) {
-                               listeners.add(l);
+                               sentTransferCancel = 
sentAbortDownstreamTransfers;
+                               if(!sentTransferCancel)
+                                       listeners.add(l);
                                reject = sentReceivedRejectOverload;
                                transfer = sentCHKTransferBegins;
                                sentFinished = sentRequestSenderFinished;
@@ -1332,6 +1390,8 @@
                        l.onReceivedRejectOverload();
                if (transfer)
                        l.onCHKTransferBegins();
+               if(sentTransferCancel)
+                       
l.onAbortDownstreamTransfers(abortDownstreamTransfersReason, 
abortDownstreamTransfersDesc);
                if (status!=NOT_FINISHED && sentFinished)
                        l.onRequestSenderFinished(status);
        }
@@ -1382,7 +1442,56 @@
                }
        }
 
+       private boolean sentAbortDownstreamTransfers;
+       private int abortDownstreamTransfersReason;
+       private String abortDownstreamTransfersDesc;
+       
+       private void sendAbortDownstreamTransfers(int reason, String desc) {
+               synchronized (listeners) {
+                       abortDownstreamTransfersReason = reason;
+                       abortDownstreamTransfersDesc = desc;
+                       sentAbortDownstreamTransfers = true;
+                       for (Listener l : listeners) {
+                               try {
+                                       l.onAbortDownstreamTransfers(reason, 
desc);
+                                       
l.onRequestSenderFinished(TRANSFER_FAILED);
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught: "+t, t);
+                               }
+                       }
+                       listeners.clear();
+               }
+       }
+       
        public int getPriority() {
                return NativeThread.HIGH_PRIORITY;
        }
+
+       public void setTurtle() {
+               synchronized(this) {
+                       this.turtleMode = true;
+               }
+               
sendAbortDownstreamTransfers(RetrievalException.GONE_TO_TURTLE_MODE, 
"Turtling");
+               node.getTicker().queueTimedJob(new Runnable() {
+
+                       public void run() {
+                               synchronized(RequestSender.this) {
+                                       if(sentBackoffTurtle) return;
+                                       sentBackoffTurtle = true;
+                               }
+                               transferringFrom.transferFailed("Turtled 
transfer");
+                       }
+                       
+               }, 30*1000);
+       }
+
+       public PeerNode transferringFrom() {
+               return transferringFrom;
+       }
+
+       public void killTurtle() {
+               prb.abort(RetrievalException.TURTLE_KILLED, "Too many turtles / 
already have turtles for this key");
+               node.failureTable.onFinalFailure(key, transferringFrom(), htl, 
FailureTable.REJECT_TIME, source);
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/RequestTag.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestTag.java      2009-01-15 15:32:04 UTC 
(rev 25063)
+++ trunk/freenet/src/freenet/node/RequestTag.java      2009-01-15 20:25:08 UTC 
(rev 25064)
@@ -25,6 +25,9 @@
        int requestSenderFinishedCode;
        Throwable handlerThrew;
        boolean rejected;
+       boolean abortedDownstreamTransfer;
+       int abortedDownstreamReason;
+       String abortedDownstreamDesc;
 
        public RequestTag(boolean isSSK, START start) {
                super();
@@ -73,10 +76,22 @@
                sb.append(" finishedCode=").append(requestSenderFinishedCode);
                sb.append(" rejected=").append(rejected);
                sb.append(" thrown=").append(handlerThrew);
+               if(abortedDownstreamTransfer) {
+                       sb.append(" abortedDownstreamTransfer reason=");
+                       sb.append(abortedDownstreamReason);
+                       sb.append(" desc=");
+                       sb.append(abortedDownstreamDesc);
+               }
                if(handlerThrew != null)
                        Logger.error(this, sb.toString(), handlerThrew);
                else
                        Logger.error(this, sb.toString());
        }
 
+       public void onAbortDownstreamTransfers(int reason, String desc) {
+               abortedDownstreamTransfer = true;
+               abortedDownstreamReason = reason;
+               abortedDownstreamDesc = desc;
+       }
+
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to