Author: toad
Date: 2008-02-02 20:05:30 +0000 (Sat, 02 Feb 2008)
New Revision: 17479

Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/node/CHKInsertHandler.java
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeClientCore.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/NodeStats.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/RequestStarter.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
Log:
Another big chunk of ULPRs.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2008-02-02 17:34:22 UTC (rev 
17478)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2008-02-02 20:05:30 UTC (rev 
17479)
@@ -858,6 +858,39 @@
                return msg;
        }

+       // Short timeout so PRIORITY_HIGH
+       public static MessageType FNPGetOfferedKey = new 
MessageType("FNPGetOfferedKey", PRIORITY_HIGH) {{
+               addField(KEY, Key.class);
+               addField(OFFER_AUTHENTICATOR, ShortBuffer.class);
+               addField(NEED_PUB_KEY, Boolean.class);
+               addField(UID, Long.class);
+       }};
+       
+       public static Message createFNPGetOfferedKey(Key key, byte[] 
authenticator, boolean needPubkey, long uid) {
+               Message msg = new Message(FNPGetOfferedKey);
+               msg.set(KEY, key);
+               msg.set(OFFER_AUTHENTICATOR, new ShortBuffer(authenticator));
+               msg.set(NEED_PUB_KEY, needPubkey);
+               msg.set(UID, uid);
+               return msg;
+       }
+       
+       // Permanently rejected. RejectedOverload means temporarily rejected.
+       public static MessageType FNPGetOfferedKeyInvalid = new 
MessageType("FNPGetOfferedKeyInvalid", PRIORITY_HIGH) {{ // short timeout
+               addField(UID, Long.class);
+               addField(REASON, Short.class);
+       }};
+       
+       public static Message createFNPGetOfferedKeyInvalid(long uid, short 
reason) {
+               Message msg = new Message(FNPGetOfferedKeyInvalid);
+               msg.set(UID, uid);
+               msg.set(REASON, reason);
+               return msg;
+       }
+       
+       public static short GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR = 1;
+       public static short GET_OFFERED_KEY_REJECTED_NO_KEY = 2;
+       
        public static final MessageType FNPPing = new MessageType("FNPPing", 
PRIORITY_HIGH) {{
                addField(PING_SEQNO, Integer.class);
        }};

Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java        2008-02-02 
17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java        2008-02-02 
20:05:30 UTC (rev 17479)
@@ -80,7 +80,7 @@
             Logger.error(this, "Caught in run() "+t, t);
         } finally {
                if(logMINOR) Logger.minor(this, "Exiting CHKInsertHandler.run() 
for "+uid);
-            node.unlockUID(uid, false, true, false);
+            node.unlockUID(uid, false, true, false, false);
         }
     }


Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java    2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/FailureTable.java    2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -4,10 +4,19 @@
 package freenet.node;

 import java.lang.ref.WeakReference;
+import java.util.Vector;

+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.keys.CHKBlock;
 import freenet.keys.Key;
 import freenet.keys.KeyBlock;
 import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
 import freenet.support.LRUHashtable;

 // FIXME it is ESSENTIAL that we delete the ULPR data on requestors etc once 
we have found the key.
@@ -140,7 +149,7 @@
                        this.offers = new BlockOffer[] { offer };
                }

-               public long expires() {
+               public synchronized long expires() {
                        long last = 0;
                        for(int i=0;i<offers.length;i++) {
                                if(offers[i].offeredTime > last) last = 
offers[i].offeredTime;
@@ -148,26 +157,62 @@
                        return last + OFFER_EXPIRY_TIME;
                }

-               public boolean isEmpty(long now) {
+               public synchronized boolean isEmpty(long now) {
                        for(int i=0;i<offers.length;i++) {
                                if(offers[i].offeredTime > now) return false;
                        }
                        return true;
                }
+
+               public synchronized void deleteOffer(BlockOffer offer) {
+                       int idx = -1;
+                       for(int i=0;i<offers.length;i++) {
+                               if(offers[i] == offer) idx = i;
+                       }
+                       if(idx == -1) return;
+                       BlockOffer[] newOffers = new 
BlockOffer[offers.length-1];
+                       if(idx > 0)
+                               System.arraycopy(offers, 0, newOffers, 0, idx);
+                       if(idx < newOffers.length)
+                               System.arraycopy(offers, idx+1, newOffers, idx, 
offers.length-idx);
+                       offers = newOffers;
+               }
+
+               public synchronized void addOffer(BlockOffer offer) {
+                       BlockOffer[] newOffers = new 
BlockOffer[offers.length+1];
+                       System.arraycopy(offers, 0, newOffers, 0, 
offers.length);
+                       newOffers[offers.length] = offer;
+                       offers = newOffers;
+               }
        }

-       private final class BlockOffer {
+       final class BlockOffer {
                final long offeredTime;
                /** Either offered by or offered to this node */
                final WeakReference nodeRef;
                /** Authenticator */
                final byte[] authenticator;
+               /** Boot ID when the offer was made */
+               final long bootID;

-               BlockOffer(PeerNode pn, long now, byte[] authenticator) {
+               BlockOffer(PeerNode pn, long now, byte[] authenticator, long 
bootID) {
                        this.nodeRef = pn.myRef;
                        this.offeredTime = now;
                        this.authenticator = authenticator;
+                       this.bootID = bootID;
                }
+
+               public PeerNode getPeerNode() {
+                       return (PeerNode) nodeRef.get();
+               }
+
+               public boolean isExpired(long now) {
+                       return now > (offeredTime + OFFER_EXPIRY_TIME);
+               }
+
+               public boolean isExpired() {
+                       return isExpired(System.currentTimeMillis());
+               }
        }

        /**
@@ -181,6 +226,7 @@
                        entry = (FailureTableEntry) entriesByKey.get(key);
                        if(entry == null) return; // Nobody cares
                        entriesByKey.removeKey(key);
+                       blockOfferListByKey.removeKey(key);
                }
                entry.offer();
        }
@@ -250,9 +296,11 @@
                        // Add to offers list

                        BlockOfferList bl = (BlockOfferList) 
blockOfferListByKey.get(key);
-                       BlockOffer offer = new BlockOffer(peer, now, 
authenticator);
+                       BlockOffer offer = new BlockOffer(peer, now, 
authenticator, peer.getBootID());
                        if(bl == null) {
                                bl = new BlockOfferList(entry, offer);
+                       } else {
+                               bl.addOffer(offer);
                        }
                        blockOfferListByKey.push(key, offer);
                        trimOffersList(now);
@@ -276,4 +324,116 @@
                        }
                }
        }
+
+       /**
+        * We offered a key, a node has responded to the offer. Note that this 
runs on the incoming
+        * packets thread so should allocate a new thread if it does anything 
heavy.
+        * @param key The key to send.
+        * @param isSSK Whether it is an SSK.
+        * @param uid The UID.
+        * @param source The node that asked for the key.
+        * @throws NotConnectedException If the sender ceases to be connected.
+        */
+       public void sendOfferedKey(Key key, boolean isSSK, boolean needPubKey, 
long uid, PeerNode source) throws NotConnectedException {
+               if(isSSK) {
+                       SSKBlock block = node.fetch((NodeSSK)key, false);
+                       if(block == null) {
+                               // Don't have the key
+                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, null);
+                               return;
+                       }
+                       Message df = DMT.createFNPSSKDataFound(uid, 
block.getRawHeaders(), block.getRawData());
+                       source.sendAsync(df, null, 0, null);
+                       if(needPubKey) {
+                               Message pk = DMT.createFNPSSKPubKey(uid, 
block.getPubKey());
+                               source.sendAsync(pk, null, 0, null);
+                       }
+               } else {
+                       CHKBlock block = node.fetch((NodeCHK)key, false);
+                       if(block == null) {
+                               // Don't have the key
+                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, null);
+                               return;
+                       }
+                       Message df = DMT.createFNPCHKDataFound(uid, 
block.getRawHeaders());
+                       source.sendAsync(df, null, 0, null);
+               PartiallyReceivedBlock prb =
+                       new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
+               BlockTransmitter bt =
+                       new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, null);
+               bt.sendAsync(node.executor);
+               }
+       }
+       
+       class OfferList {
+
+               OfferList(BlockOfferList offerList) {
+                       this.offerList = offerList;
+                       recentOffers = new Vector();
+                       expiredOffers = new Vector();
+                       long now = System.currentTimeMillis();
+                       BlockOffer[] offers = offerList.offers;
+                       for(int i=0;i<offers.length;i++) {
+                               if(!offers[i].isExpired(now))
+                                       recentOffers.add(offers[i]);
+                               else
+                                       expiredOffers.add(offers[i]);
+                       }
+               }
+               
+               private final BlockOfferList offerList;
+               
+               private final Vector recentOffers;
+               private final Vector expiredOffers;
+               
+               /** The last offer we returned */
+               private BlockOffer lastOffer;
+               
+               public BlockOffer getFirstOffer() {
+                       if(lastOffer != null) {
+                               throw new IllegalStateException("Last offer not 
dealt with");
+                       }
+                       if(!recentOffers.isEmpty()) {
+                               int x = 
node.random.nextInt(recentOffers.size());
+                               return lastOffer = (BlockOffer) 
recentOffers.remove(x);
+                       }
+                       if(!expiredOffers.isEmpty()) {
+                               int x = 
node.random.nextInt(expiredOffers.size());
+                               return lastOffer = (BlockOffer) 
expiredOffers.remove(x);
+                       }
+                       // No more offers.
+                       return null;
+               }
+               
+               /**
+                * Delete the last offer - we have used it, successfully or not.
+                */
+               public void deleteLastOffer() {
+                       offerList.deleteOffer(lastOffer);
+                       lastOffer = null;
+               }
+
+               /**
+                * Keep the last offer - we weren't able to use it e.g. because 
of RejectedOverload.
+                * Maybe it will be useful again in the future.
+                */
+               public void keepLastOffer() {
+                       lastOffer = null;
+               }
+               
+       }
+
+       public OfferList getOffers(Key key) {
+               BlockOfferList bl;
+               synchronized(this) {
+                       bl = (BlockOfferList) blockOfferListByKey.get(key);
+                       if(bl == null) return null;
+               }
+               return new OfferList(bl);
+       }
+
+       /** Called when a node disconnects */
+       public void onDisconnect(final PeerNode pn) {
+               // FIXME do something (off thread if expensive)
+       }
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2008-02-02 17:34:22 UTC (rev 
17478)
+++ trunk/freenet/src/freenet/node/Node.java    2008-02-02 20:05:30 UTC (rev 
17479)
@@ -319,6 +319,8 @@
        private final HashSet runningSSKGetUIDs;
        private final HashSet runningCHKPutUIDs;
        private final HashSet runningSSKPutUIDs;
+       private final HashSet runningCHKOfferReplyUIDs;
+       private final HashSet runningSSKOfferReplyUIDs;

        /** Semi-unique ID for swap requests. Used to identify us so that the
         * topology can be reconstructed. */
@@ -613,6 +615,8 @@
                runningSSKGetUIDs = new HashSet();
                runningCHKPutUIDs = new HashSet();
                runningSSKPutUIDs = new HashSet();
+               runningCHKOfferReplyUIDs = new HashSet();
+               runningSSKOfferReplyUIDs = new HashSet();

                // Directory for node-related files other than store

@@ -2179,10 +2183,10 @@
                return is;
        }

-       public boolean lockUID(long uid, boolean ssk, boolean insert) {
+       public boolean lockUID(long uid, boolean ssk, boolean insert, boolean 
offerReply) {
                if(logMINOR) Logger.minor(this, "Locking "+uid);
                Long l = new Long(uid);
-               HashSet set = getUIDTracker(ssk, insert);
+               HashSet set = getUIDTracker(ssk, insert, offerReply);
                synchronized(set) {
                        set.add(l);
                }
@@ -2193,11 +2197,11 @@
                }
        }

-       public void unlockUID(long uid, boolean ssk, boolean insert, boolean 
canFail) {
+       public void unlockUID(long uid, boolean ssk, boolean insert, boolean 
canFail, boolean offerReply) {
                if(logMINOR) Logger.minor(this, "Unlocking "+uid);
                Long l = new Long(uid);
                completed(uid);
-               HashSet set = getUIDTracker(ssk, insert);
+               HashSet set = getUIDTracker(ssk, insert, offerReply);
                synchronized(set) {
                        set.remove(l);
                }
@@ -2207,10 +2211,14 @@
                }
        }

-       HashSet getUIDTracker(boolean ssk, boolean insert) {
+       HashSet getUIDTracker(boolean ssk, boolean insert, boolean offerReply) {
                if(ssk) {
+                       if(offerReply)
+                               return runningSSKOfferReplyUIDs;
                        return insert ? runningSSKPutUIDs : runningSSKGetUIDs;
                } else {
+                       if(offerReply)
+                               return runningCHKOfferReplyUIDs;
                        return insert ? runningCHKPutUIDs : runningCHKGetUIDs;
                }
        }
@@ -2289,6 +2297,14 @@
                return runningCHKPutUIDs.size();
        }

+       public int getNumSSKOfferReplies() {
+               return runningSSKOfferReplyUIDs.size();
+       }
+       
+       public int getNumCHKOfferReplies() {
+               return runningCHKOfferReplyUIDs.size();
+       }
+       
        public int getNumTransferringRequestSenders() {
                synchronized(transferringRequestSenders) {
                        return transferringRequestSenders.size();

Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java  2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java  2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -450,7 +450,7 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                long startTime = System.currentTimeMillis();
                long uid = random.nextLong();
-               if(!node.lockUID(uid, false, false)) {
+               if(!node.lockUID(uid, false, false, false)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: "+uid+" - probably indicates broken PRNG");
                        throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
                }
@@ -507,7 +507,8 @@
                                                (status == 
RequestSender.RECENTLY_FAILED) ||
                                                (status == 
RequestSender.SUCCESS) ||
                                                (status == 
RequestSender.ROUTE_NOT_FOUND) ||
-                                               (status == 
RequestSender.VERIFY_FAILURE))) {
+                                               (status == 
RequestSender.VERIFY_FAILURE) ||
+                                               (status == 
RequestSender.GET_OFFER_VERIFY_FAILURE))) {
                                        long rtt = System.currentTimeMillis() - 
startTime;
                                        if(!rejectedOverload)
                                                
requestStarters.requestCompleted(false, false);
@@ -539,8 +540,10 @@
                                case RequestSender.ROUTE_NOT_FOUND:
                                        throw new 
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
                                case RequestSender.TRANSFER_FAILED:
+                               case RequestSender.GET_OFFER_TRANSFER_FAILED:
                                        throw new 
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
                                case RequestSender.VERIFY_FAILURE:
+                               case RequestSender.GET_OFFER_VERIFY_FAILURE:
                                        throw new 
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
                                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                                case RequestSender.TIMED_OUT:
@@ -554,7 +557,7 @@
                        }
                }
                } finally {
-                       node.unlockUID(uid, false, false, true);
+                       node.unlockUID(uid, false, false, true, false);
                }
        }

@@ -562,7 +565,7 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                long startTime = System.currentTimeMillis();
                long uid = random.nextLong();
-               if(!node.lockUID(uid, true, false)) {
+               if(!node.lockUID(uid, true, false, false)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: "+uid+" - probably indicates broken PRNG");
                        throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
                }
@@ -620,7 +623,8 @@
                                                (status == 
RequestSender.RECENTLY_FAILED) ||
                                                (status == 
RequestSender.SUCCESS) ||
                                                (status == 
RequestSender.ROUTE_NOT_FOUND) ||
-                                               (status == 
RequestSender.VERIFY_FAILURE))) {
+                                               (status == 
RequestSender.VERIFY_FAILURE) ||
+                                               (status == 
RequestSender.GET_OFFER_VERIFY_FAILURE))) {
                                        long rtt = System.currentTimeMillis() - 
startTime;

                                        if(!rejectedOverload)
@@ -651,9 +655,11 @@
                                case RequestSender.ROUTE_NOT_FOUND:
                                        throw new 
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
                                case RequestSender.TRANSFER_FAILED:
+                               case RequestSender.GET_OFFER_TRANSFER_FAILED:
                                        Logger.error(this, "WTF? Transfer 
failed on an SSK? on "+uid);
                                        throw new 
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
                                case RequestSender.VERIFY_FAILURE:
+                               case RequestSender.GET_OFFER_VERIFY_FAILURE:
                                        throw new 
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
                                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                                case RequestSender.TIMED_OUT:
@@ -666,7 +672,7 @@
                        }
                }
                } finally {
-                       node.unlockUID(uid, true, false, true);
+                       node.unlockUID(uid, true, false, true, false);
                }
        }

@@ -686,7 +692,7 @@
                PartiallyReceivedBlock prb = new 
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, data);
                CHKInsertSender is;
                long uid = random.nextLong();
-               if(!node.lockUID(uid, false, true)) {
+               if(!node.lockUID(uid, false, true, false)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: "+uid+" - probably indicates broken PRNG");
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
@@ -795,7 +801,7 @@
                        }
                }
                } finally {
-                       node.unlockUID(uid, false, true, true);
+                       node.unlockUID(uid, false, true, true, false);
                }
        }

@@ -803,7 +809,7 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                SSKInsertSender is;
                long uid = random.nextLong();
-               if(!node.lockUID(uid, true, true)) {
+               if(!node.lockUID(uid, true, true, false)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: "+uid+" - probably indicates broken PRNG");
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
@@ -921,7 +927,7 @@
                        }
                }
                } finally {
-                       node.unlockUID(uid, true, true, true);
+                       node.unlockUID(uid, true, true, true, false);
                }
        }


Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -15,6 +15,7 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.Peer;
 import freenet.keys.Key;
+import freenet.keys.NodeSSK;
 import freenet.support.Fields;
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
@@ -160,7 +161,9 @@
 //                     return handleProbeTrace(m, source);
                } else if(spec == DMT.FNPOfferKey) {
                        return handleOfferKey(m, source);
-               } 
+               } else if(spec == DMT.FNPGetOfferedKey) {
+                       return handleGetOfferedKey(m, source);
+               }
                return false;
        }

@@ -171,6 +174,52 @@
                return true;
        }

+       private boolean handleGetOfferedKey(Message m, PeerNode source) {
+               Key key = (Key) m.getObject(DMT.KEY);
+               byte[] authenticator = ((ShortBuffer) 
m.getObject(DMT.OFFER_AUTHENTICATOR)).getData();
+               long uid = m.getLong(DMT.UID);
+               HMAC hash = new HMAC(SHA256.getInstance());
+               if(!hash.verify(node.failureTable.offerAuthenticatorKey, 
key.getFullKey(), authenticator)) {
+                       Logger.error(this, "Invalid offer from "+source+" : 
authenticator did not verify");
+                       try {
+                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null, 0, null);
+                       } catch (NotConnectedException e) {
+                               // Too bad.
+                       }
+                       return true;
+               }
+               if(logMINOR) Logger.minor(this, "Valid GetOfferedKey for 
"+key+" from "+source);
+               
+               // Do we want it? We can RejectOverload if we don't have the 
bandwidth...
+               boolean isSSK = key instanceof NodeSSK;
+               boolean needPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
+               if(isSSK) {
+                       
+               }
+               String reject = 
+                       nodeStats.shouldRejectRequest(true, false, isSSK, 
false, true, source);
+               if(reject != null) {
+                       Logger.normal(this, "Rejecting FNPGetOfferedKey from 
"+source+" for "+key+" : "+reject);
+                       Message rejected = DMT.createFNPRejectedOverload(uid, 
true);
+                       try {
+                               source.sendAsync(rejected, null, 0, null);
+                       } catch (NotConnectedException e) {
+                               Logger.normal(this, "Rejecting (overload) data 
request from "+source.getPeer()+": "+e);
+                       }
+                       node.unlockUID(uid, isSSK, false, false, false);
+                       return true;
+               }
+               
+               // Accept it.
+               
+               try {
+                       node.failureTable.sendOfferedKey(key, isSSK, 
needPubKey, uid, source);
+               } catch (NotConnectedException e) {
+                       // Too bad.
+               }
+               return true;
+       }
+
        private void handleDisconnect(final Message m, final PeerNode source) {
                // Must run ON the packet sender thread as it sends a packet 
directly
                node.getTicker().queueTimedJob(new FastRunnable() {
@@ -229,7 +278,7 @@
                        }
                        return true;
                }
-               if(!node.lockUID(id, isSSK, false)) {
+               if(!node.lockUID(id, isSSK, false, false)) {
                        if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
@@ -241,7 +290,7 @@
                } else {
                        if(logMINOR) Logger.minor(this, "Locked "+id);
                }
-               String rejectReason = nodeStats.shouldRejectRequest(!isSSK, 
false, isSSK, false, source);
+               String rejectReason = nodeStats.shouldRejectRequest(!isSSK, 
false, isSSK, false, false, source);
                if(rejectReason != null) {
                        // can accept 1 CHK request every so often, but not 
with SSKs because they aren't throttled so won't sort out bwlimitDelayTime, 
which was the whole reason for accepting them when overloaded...
                        Logger.normal(this, "Rejecting request from 
"+source.getPeer()+" preemptively because "+rejectReason);
@@ -251,7 +300,7 @@
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) data 
request from "+source.getPeer()+": "+e);
                        }
-                       node.unlockUID(id, isSSK, false, false);
+                       node.unlockUID(id, isSSK, false, false, false);
                        return true;
                }
                //if(!node.lockUID(id)) return false;
@@ -271,7 +320,7 @@
                        }
                        return true;
                }
-               if(!node.lockUID(id, isSSK, true)) {
+               if(!node.lockUID(id, isSSK, true, false)) {
                        if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
@@ -282,7 +331,7 @@
                        return true;
                }
                // SSKs don't fix bwlimitDelayTime so shouldn't be accepted 
when overloaded.
-               String rejectReason = nodeStats.shouldRejectRequest(!isSSK, 
true, isSSK, false, source);
+               String rejectReason = nodeStats.shouldRejectRequest(!isSSK, 
true, isSSK, false, false, source);
                if(rejectReason != null) {
                        Logger.normal(this, "Rejecting insert from 
"+source.getPeer()+" preemptively because "+rejectReason);
                        Message rejected = DMT.createFNPRejectedOverload(id, 
true);
@@ -291,7 +340,7 @@
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) 
insert request from "+source.getPeer()+": "+e);
                        }
-                       node.unlockUID(id, isSSK, true, false);
+                       node.unlockUID(id, isSSK, true, false, false);
                        return true;
                }
                long now = System.currentTimeMillis();

Modified: trunk/freenet/src/freenet/node/NodeStats.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeStats.java       2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/NodeStats.java       2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -113,10 +113,14 @@
        final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
        final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
        final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
+       final TimeDecayingRunningAverage 
successfulChkOfferReplyBytesSentAverage;
+       final TimeDecayingRunningAverage 
successfulSskOfferReplyBytesSentAverage;
        final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
        final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
        final TimeDecayingRunningAverage 
successfulChkInsertBytesReceivedAverage;
        final TimeDecayingRunningAverage 
successfulSskInsertBytesReceivedAverage;
+       final TimeDecayingRunningAverage 
successfulChkOfferReplyBytesReceivedAverage;
+       final TimeDecayingRunningAverage 
successfulSskOfferReplyBytesReceivedAverage;

        final TrivialRunningAverage globalFetchPSuccess;
        final TrivialRunningAverage chkFetchPSuccess;
@@ -312,10 +316,14 @@
                successfulSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024, 
throttleFS == null ? null : 
throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
                successfulChkInsertBytesSentAverage = new 
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024, 
throttleFS == null ? null : 
throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
                successfulSskInsertBytesSentAverage = new 
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024, 
throttleFS == null ? null : 
throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
+               successfulChkOfferReplyBytesSentAverage = new 
TimeDecayingRunningAverage(32768+500, 180000, 0.0, 1024*1024*1024, throttleFS 
== null ? null : throttleFS.subset("successfulChkOfferReplyBytesSentAverage"), 
node);
+               successfulSskOfferReplyBytesSentAverage = new 
TimeDecayingRunningAverage(3072, 180000, 0.0, 1024*1024*1024, throttleFS == 
null ? null : throttleFS.subset("successfulSskOfferReplyBytesSentAverage"), 
node);
                successfulChkFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0, 
1024*1024*1024, throttleFS == null ? null : 
throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
                successfulSskFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS == 
null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"), 
node);
                successfulChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024, 
throttleFS == null ? null : 
throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
                successfulSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024, 
throttleFS == null ? null : 
throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
+               successfulChkOfferReplyBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768+500, 180000, 0.0, 1024*1024*1024, throttleFS 
== null ? null : 
throttleFS.subset("successfulChkOfferReplyBytesReceivedAverage"), node);
+               successfulSskOfferReplyBytesReceivedAverage = new 
TimeDecayingRunningAverage(3072, 180000, 0.0, 1024*1024*1024, throttleFS == 
null ? null : throttleFS.subset("successfulSskOfferReplyBytesReceivedAverage"), 
node);

                globalFetchPSuccess = new TrivialRunningAverage();
                chkFetchPSuccess = new TrivialRunningAverage();
@@ -390,7 +398,7 @@
        };

        /* return reject reason as string if should reject, otherwise return 
null */
-       public String shouldRejectRequest(boolean canAcceptAnyway, boolean 
isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
+       public String shouldRejectRequest(boolean canAcceptAnyway, boolean 
isInsert, boolean isSSK, boolean isLocal, boolean isOfferReply, PeerNode 
source) {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) dumpByteCostAverages();

@@ -460,6 +468,8 @@
                int numSSKRequests = node.getNumSSKRequests() + 1;
                int numCHKInserts = node.getNumCHKInserts() + 1;
                int numSSKInserts = node.getNumSSKInserts() + 1;
+               int numCHKOfferReplies = node.getNumCHKOfferReplies() + 1;
+               int numSSKOfferReplies = node.getNumSSKOfferReplies() + 1;
                if(logMINOR)
                        Logger.minor(this, "Running (adjusted): CHK fetch 
"+numCHKRequests+" SSK fetch "+numSSKRequests+" CHK insert "+numCHKInserts+" 
SSK insert "+numSSKInserts);

@@ -612,7 +622,9 @@
                                " CHK insert 
"+successfulChkInsertBytesSentAverage.currentValue()+ '/' 
+successfulChkInsertBytesReceivedAverage.currentValue()+
                                " SSK insert 
"+successfulSskInsertBytesSentAverage.currentValue()+ '/' 
+successfulSskInsertBytesReceivedAverage.currentValue()+
                                " CHK fetch 
"+successfulChkFetchBytesSentAverage.currentValue()+ '/' 
+successfulChkFetchBytesReceivedAverage.currentValue()+
-                               " SSK fetch 
"+successfulSskFetchBytesSentAverage.currentValue()+ '/' 
+successfulSskFetchBytesReceivedAverage.currentValue());
+                               " SSK fetch 
"+successfulSskFetchBytesSentAverage.currentValue()+ '/' 
+successfulSskFetchBytesReceivedAverage.currentValue()+
+                               " CHK offer reply 
"+successfulChkOfferReplyBytesSentAverage.currentValue()+ '/' 
+successfulChkOfferReplyBytesReceivedAverage.currentValue()+
+                               " SSK offer reply 
"+successfulSskOfferReplyBytesSentAverage.currentValue()+ '/' 
+successfulSskOfferReplyBytesReceivedAverage.currentValue());

        }

@@ -693,10 +705,14 @@
                fs.put("SuccessfulSskFetchBytesSentAverage", 
successfulSskFetchBytesSentAverage.exportFieldSet(true));
                fs.put("SuccessfulChkInsertBytesSentAverage", 
successfulChkInsertBytesSentAverage.exportFieldSet(true));
                fs.put("SuccessfulSskInsertBytesSentAverage", 
successfulSskInsertBytesSentAverage.exportFieldSet(true));
+               fs.put("SuccessfulChkOfferReplyBytesSentAverage", 
successfulChkOfferReplyBytesSentAverage.exportFieldSet(true));
+               fs.put("SuccessfulSskOfferReplyBytesSentAverage", 
successfulSskOfferReplyBytesSentAverage.exportFieldSet(true));                
                fs.put("SuccessfulChkFetchBytesReceivedAverage", 
successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
                fs.put("SuccessfulSskFetchBytesReceivedAverage", 
successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
                fs.put("SuccessfulChkInsertBytesReceivedAverage", 
successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
                fs.put("SuccessfulSskInsertBytesReceivedAverage", 
successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
+               fs.put("SuccessfulChkOfferReplyBytesReceivedAverage", 
successfulChkOfferReplyBytesReceivedAverage.exportFieldSet(true));
+               fs.put("SuccessfulSskOfferReplyBytesReceivedAverage", 
successfulSskOfferReplyBytesReceivedAverage.exportFieldSet(true));              
  

                //These are not really part of the 'throttling' data, but are 
also running averages which should be persisted
                fs.put("AverageCacheLocation", 
avgCacheLocation.exportFieldSet(true));

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -1116,6 +1116,7 @@
                final long now = System.currentTimeMillis();
                Logger.normal(this, "Disconnected " + this);
                node.usm.onDisconnect(this);
+               node.failureTable.onDisconnect(this);
                node.peers.disconnected(this);
                boolean ret;
                synchronized(this) {

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -84,11 +84,11 @@
         } catch (NotConnectedException e) {
                Logger.normal(this, "requestor gone, could not start request 
handler wait");
                        node.removeTransferringRequestHandler(uid);
-            node.unlockUID(uid, key instanceof NodeSSK, false, false);
+            node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
                        node.removeTransferringRequestHandler(uid);
-            node.unlockUID(uid, key instanceof NodeSSK, false, false);
+            node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
         }
     }

@@ -288,6 +288,7 @@
                        }
                                        return;
                case RequestSender.VERIFY_FAILURE:
+               case RequestSender.GET_OFFER_VERIFY_FAILURE:
                        if(key instanceof NodeCHK) {
                                                if(bt == null) {
                                        // Bug! This is impossible!
@@ -305,6 +306,7 @@
                        sendTerminal(reject);
                        return;
                case RequestSender.TRANSFER_FAILED:
+               case RequestSender.GET_OFFER_TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {
                                if(bt == null) {
                                        // Bug! This is impossible!
@@ -372,7 +374,7 @@

        private void unregisterRequestHandlerWithNode() {
                node.removeTransferringRequestHandler(uid);
-               node.unlockUID(uid, key instanceof NodeSSK, false, false);
+               node.unlockUID(uid, key instanceof NodeSSK, false, false, 
false);
        }

        /**

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -28,6 +28,8 @@
 import freenet.keys.NodeSSK;
 import freenet.keys.SSKBlock;
 import freenet.keys.SSKVerifyException;
+import freenet.node.FailureTable.BlockOffer;
+import freenet.node.FailureTable.OfferList;
 import freenet.store.KeyCollisionException;
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
@@ -49,6 +51,7 @@

     // Constants
     static final int ACCEPTED_TIMEOUT = 5000;
+    static final int GET_OFFER_TIMEOUT = 10000;
     static final int FETCH_TIMEOUT = 120000;
     /** Wait up to this long to get a path folding reply */
     static final int OPENNET_TIMEOUT = 120000;
@@ -72,7 +75,10 @@
     private byte[] sskData;
     private SSKBlock block;
     private boolean hasForwarded;
-       
+    
+    /** If true, only try to fetch the key from nodes which have offered it */
+    private boolean tryOffersOnly;
+    
        private ArrayList listeners=new ArrayList();

     // Terminal status
@@ -89,6 +95,8 @@
     static final int GENERATED_REJECTED_OVERLOAD = 7;
     static final int INTERNAL_ERROR = 8;
     static final int RECENTLY_FAILED = 9;
+    static final int GET_OFFER_VERIFY_FAILURE = 10;
+    static final int GET_OFFER_TRANSFER_FAILED = 11;
     private PeerNode successFrom;

     static String getStatusString(int status) {
@@ -103,8 +111,12 @@
                return "DATA NOT FOUND";
        case TRANSFER_FAILED:
                return "TRANSFER FAILED";
+       case GET_OFFER_TRANSFER_FAILED:
+               return "GET OFFER TRANSFER FAILED";
        case VERIFY_FAILURE:
                return "VERIFY FAILURE";
+       case GET_OFFER_VERIFY_FAILURE:
+               return "GET OFFER VERIFY FAILURE";
        case TIMED_OUT:
                return "TIMED OUT";
        case GENERATED_REJECTED_OVERLOAD:
@@ -158,7 +170,7 @@
                realRun();
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
-            finish(INTERNAL_ERROR, null);
+            finish(INTERNAL_ERROR, null, false);
         } finally {
                if(logMINOR) Logger.minor(this, "Leaving RequestSender.run() 
for "+uid);
             node.removeRequestSender(key, origHTL, this);
@@ -171,6 +183,191 @@
                pubKey = ((NodeSSK)key).getPubKey();
         }

+        // First ask any nodes that have offered the data
+        
+        OfferList offers = node.failureTable.getOffers(key);
+        
+        while(true) {
+               // Fetches valid offers, then expired ones. Expired offers 
don't count towards failures,
+               // but they're still worth trying.
+               BlockOffer offer = offers.getFirstOffer();
+               if(offer == null) break;
+               PeerNode pn = offer.getPeerNode();
+               if(pn == null) {
+                       offers.deleteLastOffer();
+                       continue;
+               }
+               if(pn.getBootID() != offer.bootID) {
+                       offers.deleteLastOffer();
+                       continue;
+               }
+               Message msg = DMT.createFNPGetOfferedKey(key, 
offer.authenticator, pubKey == null, uid);
+               try {
+                               pn.sendAsync(msg, null, 0, this);
+                       } catch (NotConnectedException e2) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Disconnected: 
"+pn+" getting offer for "+key);
+                               offers.deleteLastOffer();
+                               continue;
+                       }
+               MessageFilter mfRO = 
MessageFilter.create().setSource(pn).setField(DMT.UID, 
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPRejectedOverload);
+               MessageFilter mfGetInvalid = 
MessageFilter.create().setSource(pn).setField(DMT.UID, 
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPGetOfferedKeyInvalid);
+               // Wait for a response.
+               if(key instanceof NodeCHK) {
+                       // Headers first, then block transfer.
+                       MessageFilter mfDF = 
MessageFilter.create().setSource(pn).setField(DMT.UID, 
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPCHKDataFound);
+                       Message reply;
+                               try {
+                                       reply = 
node.usm.waitFor(mfDF.or(mfRO.or(mfGetInvalid)), this);
+                               } catch (DisconnectedException e2) {
+                                       if(logMINOR)
+                                               Logger.minor(this, 
"Disconnected: "+pn+" getting offer for "+key);
+                                       offers.deleteLastOffer();
+                                       continue;
+                               }
+                       if(reply == null) {
+                               // We gave it a chance, don't give it another.
+                               offers.deleteLastOffer();
+                               continue;
+                       } else if(reply.getSpec() == DMT.FNPRejectedOverload) {
+                               // Non-fatal, keep it.
+                               if(logMINOR)
+                                       Logger.minor(this, "Node "+pn+" 
rejected FNPGetOfferedKey for "+key+" (expired="+offer.isExpired());
+                               offers.keepLastOffer();
+                               continue;
+                       } else if(reply.getSpec() == 
DMT.FNPGetOfferedKeyInvalid) {
+                               // Fatal, delete it.
+                               if(logMINOR)
+                                       Logger.minor(this, "Node "+pn+" 
rejected FNPGetOfferedKey as invalid with reason "+reply.getShort(DMT.REASON));
+                               offers.deleteLastOffer();
+                               continue;
+                       } else if(reply.getSpec() == DMT.FNPCHKDataFound) {
+                               headers = 
((ShortBuffer)reply.getObject(DMT.BLOCK_HEADERS)).getData();
+                               // Receive the data
+                               
+                       // FIXME: Validate headers
+                       
+                       node.addTransferringSender((NodeCHK)key, this);
+                       
+                       try {
+                               
+                               prb = new 
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE);
+                               
+                               synchronized(this) {
+                                       notifyAll();
+                               }
+                               fireCHKTransferBegins();
+                                               
+                               BlockReceiver br = new BlockReceiver(node.usm, 
pn, uid, prb, this);
+                               
+                               try {
+                                       if(logMINOR) Logger.minor(this, 
"Receiving data");
+                                       byte[] data = br.receive();
+                                       if(logMINOR) Logger.minor(this, 
"Received data");
+                                       // Received data
+                                       try {
+                                               verifyAndCommit(data);
+                                       } catch (KeyVerifyException e1) {
+                                               Logger.normal(this, "Got data 
but verify failed: "+e1, e1);
+                                               
finish(GET_OFFER_VERIFY_FAILURE, pn, true);
+                                       node.failureTable.onFailure(key, htl, 
new PeerNode[] { source }, pn, -1, System.currentTimeMillis());
+                                       offers.deleteLastOffer();
+                                               return;
+                                       }
+                                       finish(SUCCESS, pn, true);
+                                       return;
+                               } catch (RetrievalException e) {
+                                                       if 
(e.getReason()==RetrievalException.SENDER_DISCONNECTED)
+                                                               
Logger.normal(this, "Transfer failed (disconnect): "+e, e);
+                                                       else
+                                                               
Logger.error(this, "Transfer failed 
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
+                                       finish(GET_OFFER_TRANSFER_FAILED, pn, 
true);
+                               node.failureTable.onFailure(key, htl, new 
PeerNode[] { source }, pn, -1, System.currentTimeMillis());
+                               offers.deleteLastOffer();
+                                       return;
+                               }
+                       } finally {
+                               node.removeTransferringSender((NodeCHK)key, 
this);
+                       }
+                       }
+               } else {
+                       // Data, possibly followed by pubkey
+                       MessageFilter mfDF = 
MessageFilter.create().setSource(pn).setField(DMT.UID, 
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPSSKDataFound);
+                       Message reply;
+                               try {
+                                       reply = 
node.usm.waitFor(mfDF.or(mfRO.or(mfGetInvalid)), this);
+                               } catch (DisconnectedException e) {
+                                       if(logMINOR)
+                                               Logger.minor(this, 
"Disconnected: "+pn+" getting offer for "+key);
+                                       offers.deleteLastOffer();
+                                       continue;
+                               }
+                       if(reply == null) {
+                       offers.deleteLastOffer();
+                               continue;
+                       } else if(reply.getSpec() == DMT.FNPRejectedOverload) {
+                               // Non-fatal, keep it.
+                               if(logMINOR)
+                                       Logger.minor(this, "Node "+pn+" 
rejected FNPGetOfferedKey for "+key+" (expired="+offer.isExpired());
+                               offers.keepLastOffer();
+                               continue;
+                       } else if(reply.getSpec() == 
DMT.FNPGetOfferedKeyInvalid) {
+                               // Fatal, delete it.
+                               if(logMINOR)
+                                       Logger.minor(this, "Node "+pn+" 
rejected FNPGetOfferedKey as invalid with reason "+reply.getShort(DMT.REASON));
+                               offers.deleteLastOffer();
+                               continue;
+                       } else if(reply.getSpec() == DMT.FNPSSKDataFound) {
+                               // Receive the data
+                               headers = ((ShortBuffer) 
reply.getObject(DMT.BLOCK_HEADERS)).getData();
+                               sskData = ((ShortBuffer) 
reply.getObject(DMT.DATA)).getData();
+                               if(pubKey != null) {
+                                       MessageFilter mfPK = 
MessageFilter.create().setSource(pn).setField(DMT.UID, 
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPSSKPubKey);
+                                       Message pk;
+                                               try {
+                                                       pk = 
node.usm.waitFor(mfPK, this);
+                                               } catch (DisconnectedException 
e) {
+                                                       if(logMINOR)
+                                                               
Logger.minor(this, "Disconnected: "+pn+" getting pubkey for offer for "+key);
+                                                       
offers.deleteLastOffer();
+                                                       continue;
+                                               }
+                                       if(pk == null) {
+                                               Logger.error(this, "Got data 
but not pubkey from "+pn+" for offer for "+key);
+                                               offers.deleteLastOffer();
+                                               continue;
+                                       }
+                                       try {
+                                                       pubKey = 
DSAPublicKey.create(((ShortBuffer)pk.getObject(DMT.PUBKEY_AS_BYTES)).getData());
+                                               } catch (CryptFormatException 
e) {
+                                                       Logger.error(this, 
"Bogus pubkey from "+pn+" for offer for "+key+" : "+e, e);
+                                               offers.deleteLastOffer();
+                                                       continue;
+                                               }
+                               }
+                               
+                               try {
+                                               
((NodeSSK)key).setPubKey(pubKey);
+                                       } catch (SSKVerifyException e) {
+                                               Logger.error(this, "Bogus SSK 
data from "+pn+" for offer for "+key+" : "+e, e);
+                                       offers.deleteLastOffer();
+                                               continue;
+                                       }
+                               
+                               if(finishSSKFromGetOffer(pn)) {
+                                       if(logMINOR) Logger.minor(this, 
"Successfully fetched SSK from offer from "+pn+" for "+key);
+                                       return;
+                               } else {
+                               offers.deleteLastOffer();
+                                       continue;
+                               }
+                       }
+               }
+               // RejectedOverload is possible - but we need to include it in 
the statistics.
+               // We don't remove the offer in that case. Otherwise we do, 
even if it fails.
+               // FNPGetOfferedKeyInvalid is also possible.
+        }
+        
                int routeAttempts=0;
                int rejectOverloads=0;
         HashSet nodesRoutedTo = new HashSet();
@@ -180,7 +377,7 @@
             if(htl == 0) {
                // This used to be RNF, I dunno why
                                //???: finish(GENERATED_REJECTED_OVERLOAD, 
null);
-                finish(DATA_NOT_FOUND, null);
+                finish(DATA_NOT_FOUND, null, false);
                        node.failureTable.onFailure(key, htl, new PeerNode[] { 
source }, null, FailureTable.REJECT_TIME, System.currentTimeMillis());
                 return;
             }
@@ -195,7 +392,7 @@
                                if (logMINOR && rejectOverloads>0)
                                        Logger.minor(this, "no more peers, but 
overloads ("+rejectOverloads+"/"+routeAttempts+" overloaded)");
                 // Backtrack
-                finish(ROUTE_NOT_FOUND, null);
+                finish(ROUTE_NOT_FOUND, null, false);
                        node.failureTable.onFailure(key, htl, new PeerNode[] { 
source }, null, -1, System.currentTimeMillis());
                 return;
             }
@@ -347,7 +544,7 @@
                        // Fatal timeout
                        next.localRejectedOverload("FatalTimeout");
                        forwardRejectedOverload();
-                       finish(TIMED_OUT, next);
+                       finish(TIMED_OUT, next, false);
                        node.failureTable.onFailure(key, htl, new PeerNode[] { 
source }, next, -1, System.currentTimeMillis());
                        return;
                }
@@ -358,7 +555,7 @@

                if(msg.getSpec() == DMT.FNPDataNotFound) {
                        next.successNotOverload();
-                       finish(DATA_NOT_FOUND, next);
+                       finish(DATA_NOT_FOUND, next, false);
                        node.failureTable.onFailure(key, htl, new PeerNode[] { 
source }, next, FailureTable.REJECT_TIME, System.currentTimeMillis());
                        return;
                }
@@ -420,7 +617,7 @@
                                // Kill the request, regardless of whether 
there is timeout left.
                        // If there is, we will avoid sending requests for the 
specified period.
                        // FIXME we need to create the FT entry.
-                               finish(RECENTLY_FAILED, next);
+                               finish(RECENTLY_FAILED, next, false);
                        node.failureTable.onFailure(key, htl, new PeerNode[] { 
source }, next, timeLeft, System.currentTimeMillis());
                        return;
                }
@@ -486,18 +683,18 @@
                                                verifyAndCommit(data);
                                        } catch (KeyVerifyException e1) {
                                                Logger.normal(this, "Got data 
but verify failed: "+e1, e1);
-                                               finish(VERIFY_FAILURE, next);
+                                               finish(VERIFY_FAILURE, next, 
false);
                                        node.failureTable.onFailure(key, htl, 
new PeerNode[] { source }, next, -1, System.currentTimeMillis());
                                                return;
                                        }
-                                       finish(SUCCESS, next);
+                                       finish(SUCCESS, next, false);
                                        return;
                                } catch (RetrievalException e) {
                                                        if 
(e.getReason()==RetrievalException.SENDER_DISCONNECTED)
                                                                
Logger.normal(this, "Transfer failed (disconnect): "+e, e);
                                                        else
                                                                
Logger.error(this, "Transfer failed 
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
-                                       finish(TRANSFER_FAILED, next);
+                                       finish(TRANSFER_FAILED, next, false);
                                node.failureTable.onFailure(key, htl, new 
PeerNode[] { source }, next, -1, System.currentTimeMillis());
                                        return;
                                }
@@ -570,18 +767,41 @@
                        node.storeShallow(block);
                        if(node.random.nextInt(RANDOM_REINSERT_INTERVAL) == 0)
                                node.queueRandomReinsert(block);
-                       finish(SUCCESS, next);
+                       finish(SUCCESS, next, false);
                } catch (SSKVerifyException e) {
                        Logger.error(this, "Failed to verify: "+e+" from 
"+next, e);
-                       finish(VERIFY_FAILURE, next);
+                       finish(VERIFY_FAILURE, next, false);
                        return;
                } catch (KeyCollisionException e) {
                        Logger.normal(this, "Collision on "+this);
-                       finish(SUCCESS, next);
+                       finish(SUCCESS, next, false);
                }
        }

     /**
+     * Finish fetching an SSK. We must have received the data, the headers and 
the pubkey by this point.
+     * @param next The node we received the data from.
+     * @return True if the request has completed. False if we need to look 
elsewhere.
+     */
+       private boolean finishSSKFromGetOffer(PeerNode next) {
+       try {
+                       block = new SSKBlock(sskData, headers, (NodeSSK)key, 
false);
+                       node.storeShallow(block);
+                       if(node.random.nextInt(RANDOM_REINSERT_INTERVAL) == 0)
+                               node.queueRandomReinsert(block);
+                       finish(SUCCESS, next, true);
+                       return true;
+               } catch (SSKVerifyException e) {
+                       Logger.error(this, "Failed to verify (from get offer): 
"+e+" from "+next, e);
+                       return false;
+               } catch (KeyCollisionException e) {
+                       Logger.normal(this, "Collision (from get offer) on 
"+this);
+                       finish(SUCCESS, next, true);
+                       return false;
+               }
+       }
+
+    /**
      * Note that this must be first on the list.
      */
        private MessageFilter makeDataFoundFilter(PeerNode next) {
@@ -696,7 +916,7 @@
         }            
     }

-    private void finish(int code, PeerNode next) {
+    private void finish(int code, PeerNode next, boolean fromOfferedKey) {
        if(logMINOR) Logger.minor(this, "finish("+code+ ')');

         synchronized(this) {
@@ -710,18 +930,21 @@
                if(next != null) {
                        next.onSuccess(false, key instanceof NodeSSK);
                }
-               node.nodeStats.requestCompleted(true, source != null, key 
instanceof NodeSSK);
+               // FIXME should this be called when fromOfferedKey??
+                       node.nodeStats.requestCompleted(true, source != null, 
key instanceof NodeSSK);

                        //NOTE: because of the requesthandler implementation, 
this will block and wait
                        //      for downstream transfers on a CHK. The opennet 
stuff introduces
                        //      a delay of it's own if we don't get the 
expected message.
                        fireRequestSenderFinished(code);

-               if(key instanceof NodeCHK && next != null && 
-                               (next.isOpennet() || 
node.passOpennetRefsThroughDarknet()) ) {
-                       finishOpennet(next);
-               } else
-                       finishOpennetNull(next);
+                       if(fromOfferedKey) {
+                               if(key instanceof NodeCHK && next != null && 
+                                               (next.isOpennet() || 
node.passOpennetRefsThroughDarknet()) ) {
+                                       finishOpennet(next);
+                               } else
+                                       finishOpennetNull(next);
+                       }
         } else {
                node.nodeStats.requestCompleted(false, source != null, key 
instanceof NodeSSK);
                        fireRequestSenderFinished(code);

Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java  2008-02-02 17:34:22 UTC 
(rev 17478)
+++ trunk/freenet/src/freenet/node/RequestStarter.java  2008-02-02 20:05:30 UTC 
(rev 17479)
@@ -117,7 +117,7 @@
                                } while(now < sleepUntil);
                                String reason;
                                if(LOCAL_REQUESTS_COMPETE_FAIRLY) {
-                                       if((reason = 
stats.shouldRejectRequest(true, isInsert, isSSK, true, null)) != null) {
+                                       if((reason = 
stats.shouldRejectRequest(true, isInsert, isSSK, true, false, null)) != null) {
                                                if(logMINOR)
                                                        Logger.minor(this, "Not 
sending local request: "+reason);
                                                // Wait one throttle-delay 
before trying again

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-02-02 
17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-02-02 
20:05:30 UTC (rev 17479)
@@ -84,7 +84,7 @@
             Logger.error(this, "Caught "+t, t);
         } finally {
             if(logMINOR) Logger.minor(this, "Exiting InsertHandler.run() for 
"+uid);
-            node.unlockUID(uid, true, true, false);
+            node.unlockUID(uid, true, true, false, false);
         }
     }



Reply via email to