Author: toad
Date: 2008-02-02 20:05:30 +0000 (Sat, 02 Feb 2008)
New Revision: 17479
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/node/CHKInsertHandler.java
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeClientCore.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/NodeStats.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/RequestSender.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
Log:
Another big chunk of ULPRs.
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2008-02-02 17:34:22 UTC (rev
17478)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2008-02-02 20:05:30 UTC (rev
17479)
@@ -858,6 +858,39 @@
return msg;
}
+ // Short timeout so PRIORITY_HIGH
+ public static MessageType FNPGetOfferedKey = new
MessageType("FNPGetOfferedKey", PRIORITY_HIGH) {{
+ addField(KEY, Key.class);
+ addField(OFFER_AUTHENTICATOR, ShortBuffer.class);
+ addField(NEED_PUB_KEY, Boolean.class);
+ addField(UID, Long.class);
+ }};
+
+ public static Message createFNPGetOfferedKey(Key key, byte[]
authenticator, boolean needPubkey, long uid) {
+ Message msg = new Message(FNPGetOfferedKey);
+ msg.set(KEY, key);
+ msg.set(OFFER_AUTHENTICATOR, new ShortBuffer(authenticator));
+ msg.set(NEED_PUB_KEY, needPubkey);
+ msg.set(UID, uid);
+ return msg;
+ }
+
+ // Permanently rejected. RejectedOverload means temporarily rejected.
+ public static MessageType FNPGetOfferedKeyInvalid = new
MessageType("FNPGetOfferedKeyInvalid", PRIORITY_HIGH) {{ // short timeout
+ addField(UID, Long.class);
+ addField(REASON, Short.class);
+ }};
+
+ public static Message createFNPGetOfferedKeyInvalid(long uid, short
reason) {
+ Message msg = new Message(FNPGetOfferedKeyInvalid);
+ msg.set(UID, uid);
+ msg.set(REASON, reason);
+ return msg;
+ }
+
+ public static short GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR = 1;
+ public static short GET_OFFERED_KEY_REJECTED_NO_KEY = 2;
+
public static final MessageType FNPPing = new MessageType("FNPPing",
PRIORITY_HIGH) {{
addField(PING_SEQNO, Integer.class);
}};
Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-02-02
17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-02-02
20:05:30 UTC (rev 17479)
@@ -80,7 +80,7 @@
Logger.error(this, "Caught in run() "+t, t);
} finally {
if(logMINOR) Logger.minor(this, "Exiting CHKInsertHandler.run()
for "+uid);
- node.unlockUID(uid, false, true, false);
+ node.unlockUID(uid, false, true, false, false);
}
}
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -4,10 +4,19 @@
package freenet.node;
import java.lang.ref.WeakReference;
+import java.util.Vector;
+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.keys.CHKBlock;
import freenet.keys.Key;
import freenet.keys.KeyBlock;
import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
import freenet.support.LRUHashtable;
// FIXME it is ESSENTIAL that we delete the ULPR data on requestors etc once
we have found the key.
@@ -140,7 +149,7 @@
this.offers = new BlockOffer[] { offer };
}
- public long expires() {
+ public synchronized long expires() {
long last = 0;
for(int i=0;i<offers.length;i++) {
if(offers[i].offeredTime > last) last =
offers[i].offeredTime;
@@ -148,26 +157,62 @@
return last + OFFER_EXPIRY_TIME;
}
- public boolean isEmpty(long now) {
+ public synchronized boolean isEmpty(long now) {
for(int i=0;i<offers.length;i++) {
if(offers[i].offeredTime > now) return false;
}
return true;
}
+
+ public synchronized void deleteOffer(BlockOffer offer) {
+ int idx = -1;
+ for(int i=0;i<offers.length;i++) {
+ if(offers[i] == offer) idx = i;
+ }
+ if(idx == -1) return;
+ BlockOffer[] newOffers = new
BlockOffer[offers.length-1];
+ if(idx > 0)
+ System.arraycopy(offers, 0, newOffers, 0, idx);
+ if(idx < newOffers.length)
+ System.arraycopy(offers, idx+1, newOffers, idx,
offers.length-idx);
+ offers = newOffers;
+ }
+
+ public synchronized void addOffer(BlockOffer offer) {
+ BlockOffer[] newOffers = new
BlockOffer[offers.length+1];
+ System.arraycopy(offers, 0, newOffers, 0,
offers.length);
+ newOffers[offers.length] = offer;
+ offers = newOffers;
+ }
}
- private final class BlockOffer {
+ final class BlockOffer {
final long offeredTime;
/** Either offered by or offered to this node */
final WeakReference nodeRef;
/** Authenticator */
final byte[] authenticator;
+ /** Boot ID when the offer was made */
+ final long bootID;
- BlockOffer(PeerNode pn, long now, byte[] authenticator) {
+ BlockOffer(PeerNode pn, long now, byte[] authenticator, long
bootID) {
this.nodeRef = pn.myRef;
this.offeredTime = now;
this.authenticator = authenticator;
+ this.bootID = bootID;
}
+
+ public PeerNode getPeerNode() {
+ return (PeerNode) nodeRef.get();
+ }
+
+ public boolean isExpired(long now) {
+ return now > (offeredTime + OFFER_EXPIRY_TIME);
+ }
+
+ public boolean isExpired() {
+ return isExpired(System.currentTimeMillis());
+ }
}
/**
@@ -181,6 +226,7 @@
entry = (FailureTableEntry) entriesByKey.get(key);
if(entry == null) return; // Nobody cares
entriesByKey.removeKey(key);
+ blockOfferListByKey.removeKey(key);
}
entry.offer();
}
@@ -250,9 +296,11 @@
// Add to offers list
BlockOfferList bl = (BlockOfferList)
blockOfferListByKey.get(key);
- BlockOffer offer = new BlockOffer(peer, now,
authenticator);
+ BlockOffer offer = new BlockOffer(peer, now,
authenticator, peer.getBootID());
if(bl == null) {
bl = new BlockOfferList(entry, offer);
+ } else {
+ bl.addOffer(offer);
}
blockOfferListByKey.push(key, offer);
trimOffersList(now);
@@ -276,4 +324,116 @@
}
}
}
+
+ /**
+ * We offered a key, a node has responded to the offer. Note that this
runs on the incoming
+ * packets thread so should allocate a new thread if it does anything
heavy.
+ * @param key The key to send.
+ * @param isSSK Whether it is an SSK.
+ * @param uid The UID.
+ * @param source The node that asked for the key.
+ * @throws NotConnectedException If the sender ceases to be connected.
+ */
+ public void sendOfferedKey(Key key, boolean isSSK, boolean needPubKey,
long uid, PeerNode source) throws NotConnectedException {
+ if(isSSK) {
+ SSKBlock block = node.fetch((NodeSSK)key, false);
+ if(block == null) {
+ // Don't have the key
+
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, null);
+ return;
+ }
+ Message df = DMT.createFNPSSKDataFound(uid,
block.getRawHeaders(), block.getRawData());
+ source.sendAsync(df, null, 0, null);
+ if(needPubKey) {
+ Message pk = DMT.createFNPSSKPubKey(uid,
block.getPubKey());
+ source.sendAsync(pk, null, 0, null);
+ }
+ } else {
+ CHKBlock block = node.fetch((NodeCHK)key, false);
+ if(block == null) {
+ // Don't have the key
+
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, null);
+ return;
+ }
+ Message df = DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
+ source.sendAsync(df, null, 0, null);
+ PartiallyReceivedBlock prb =
+ new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getRawData());
+ BlockTransmitter bt =
+ new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, null);
+ bt.sendAsync(node.executor);
+ }
+ }
+
+ class OfferList {
+
+ OfferList(BlockOfferList offerList) {
+ this.offerList = offerList;
+ recentOffers = new Vector();
+ expiredOffers = new Vector();
+ long now = System.currentTimeMillis();
+ BlockOffer[] offers = offerList.offers;
+ for(int i=0;i<offers.length;i++) {
+ if(!offers[i].isExpired(now))
+ recentOffers.add(offers[i]);
+ else
+ expiredOffers.add(offers[i]);
+ }
+ }
+
+ private final BlockOfferList offerList;
+
+ private final Vector recentOffers;
+ private final Vector expiredOffers;
+
+ /** The last offer we returned */
+ private BlockOffer lastOffer;
+
+ public BlockOffer getFirstOffer() {
+ if(lastOffer != null) {
+ throw new IllegalStateException("Last offer not
dealt with");
+ }
+ if(!recentOffers.isEmpty()) {
+ int x =
node.random.nextInt(recentOffers.size());
+ return lastOffer = (BlockOffer)
recentOffers.remove(x);
+ }
+ if(!expiredOffers.isEmpty()) {
+ int x =
node.random.nextInt(expiredOffers.size());
+ return lastOffer = (BlockOffer)
expiredOffers.remove(x);
+ }
+ // No more offers.
+ return null;
+ }
+
+ /**
+ * Delete the last offer - we have used it, successfully or not.
+ */
+ public void deleteLastOffer() {
+ offerList.deleteOffer(lastOffer);
+ lastOffer = null;
+ }
+
+ /**
+ * Keep the last offer - we weren't able to use it e.g. because
of RejectedOverload.
+ * Maybe it will be useful again in the future.
+ */
+ public void keepLastOffer() {
+ lastOffer = null;
+ }
+
+ }
+
+ public OfferList getOffers(Key key) {
+ BlockOfferList bl;
+ synchronized(this) {
+ bl = (BlockOfferList) blockOfferListByKey.get(key);
+ if(bl == null) return null;
+ }
+ return new OfferList(bl);
+ }
+
+ /** Called when a node disconnects */
+ public void onDisconnect(final PeerNode pn) {
+ // FIXME do something (off thread if expensive)
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2008-02-02 17:34:22 UTC (rev
17478)
+++ trunk/freenet/src/freenet/node/Node.java 2008-02-02 20:05:30 UTC (rev
17479)
@@ -319,6 +319,8 @@
private final HashSet runningSSKGetUIDs;
private final HashSet runningCHKPutUIDs;
private final HashSet runningSSKPutUIDs;
+ private final HashSet runningCHKOfferReplyUIDs;
+ private final HashSet runningSSKOfferReplyUIDs;
/** Semi-unique ID for swap requests. Used to identify us so that the
* topology can be reconstructed. */
@@ -613,6 +615,8 @@
runningSSKGetUIDs = new HashSet();
runningCHKPutUIDs = new HashSet();
runningSSKPutUIDs = new HashSet();
+ runningCHKOfferReplyUIDs = new HashSet();
+ runningSSKOfferReplyUIDs = new HashSet();
// Directory for node-related files other than store
@@ -2179,10 +2183,10 @@
return is;
}
- public boolean lockUID(long uid, boolean ssk, boolean insert) {
+ public boolean lockUID(long uid, boolean ssk, boolean insert, boolean
offerReply) {
if(logMINOR) Logger.minor(this, "Locking "+uid);
Long l = new Long(uid);
- HashSet set = getUIDTracker(ssk, insert);
+ HashSet set = getUIDTracker(ssk, insert, offerReply);
synchronized(set) {
set.add(l);
}
@@ -2193,11 +2197,11 @@
}
}
- public void unlockUID(long uid, boolean ssk, boolean insert, boolean
canFail) {
+ public void unlockUID(long uid, boolean ssk, boolean insert, boolean
canFail, boolean offerReply) {
if(logMINOR) Logger.minor(this, "Unlocking "+uid);
Long l = new Long(uid);
completed(uid);
- HashSet set = getUIDTracker(ssk, insert);
+ HashSet set = getUIDTracker(ssk, insert, offerReply);
synchronized(set) {
set.remove(l);
}
@@ -2207,10 +2211,14 @@
}
}
- HashSet getUIDTracker(boolean ssk, boolean insert) {
+ HashSet getUIDTracker(boolean ssk, boolean insert, boolean offerReply) {
if(ssk) {
+ if(offerReply)
+ return runningSSKOfferReplyUIDs;
return insert ? runningSSKPutUIDs : runningSSKGetUIDs;
} else {
+ if(offerReply)
+ return runningCHKOfferReplyUIDs;
return insert ? runningCHKPutUIDs : runningCHKGetUIDs;
}
}
@@ -2289,6 +2297,14 @@
return runningCHKPutUIDs.size();
}
+ public int getNumSSKOfferReplies() {
+ return runningSSKOfferReplyUIDs.size();
+ }
+
+ public int getNumCHKOfferReplies() {
+ return runningCHKOfferReplyUIDs.size();
+ }
+
public int getNumTransferringRequestSenders() {
synchronized(transferringRequestSenders) {
return transferringRequestSenders.size();
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -450,7 +450,7 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
long startTime = System.currentTimeMillis();
long uid = random.nextLong();
- if(!node.lockUID(uid, false, false)) {
+ if(!node.lockUID(uid, false, false, false)) {
Logger.error(this, "Could not lock UID just randomly
generated: "+uid+" - probably indicates broken PRNG");
throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
}
@@ -507,7 +507,8 @@
(status ==
RequestSender.RECENTLY_FAILED) ||
(status ==
RequestSender.SUCCESS) ||
(status ==
RequestSender.ROUTE_NOT_FOUND) ||
- (status ==
RequestSender.VERIFY_FAILURE))) {
+ (status ==
RequestSender.VERIFY_FAILURE) ||
+ (status ==
RequestSender.GET_OFFER_VERIFY_FAILURE))) {
long rtt = System.currentTimeMillis() -
startTime;
if(!rejectedOverload)
requestStarters.requestCompleted(false, false);
@@ -539,8 +540,10 @@
case RequestSender.ROUTE_NOT_FOUND:
throw new
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
case RequestSender.TRANSFER_FAILED:
+ case RequestSender.GET_OFFER_TRANSFER_FAILED:
throw new
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
case RequestSender.VERIFY_FAILURE:
+ case RequestSender.GET_OFFER_VERIFY_FAILURE:
throw new
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -554,7 +557,7 @@
}
}
} finally {
- node.unlockUID(uid, false, false, true);
+ node.unlockUID(uid, false, false, true, false);
}
}
@@ -562,7 +565,7 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
long startTime = System.currentTimeMillis();
long uid = random.nextLong();
- if(!node.lockUID(uid, true, false)) {
+ if(!node.lockUID(uid, true, false, false)) {
Logger.error(this, "Could not lock UID just randomly
generated: "+uid+" - probably indicates broken PRNG");
throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
}
@@ -620,7 +623,8 @@
(status ==
RequestSender.RECENTLY_FAILED) ||
(status ==
RequestSender.SUCCESS) ||
(status ==
RequestSender.ROUTE_NOT_FOUND) ||
- (status ==
RequestSender.VERIFY_FAILURE))) {
+ (status ==
RequestSender.VERIFY_FAILURE) ||
+ (status ==
RequestSender.GET_OFFER_VERIFY_FAILURE))) {
long rtt = System.currentTimeMillis() -
startTime;
if(!rejectedOverload)
@@ -651,9 +655,11 @@
case RequestSender.ROUTE_NOT_FOUND:
throw new
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
case RequestSender.TRANSFER_FAILED:
+ case RequestSender.GET_OFFER_TRANSFER_FAILED:
Logger.error(this, "WTF? Transfer
failed on an SSK? on "+uid);
throw new
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
case RequestSender.VERIFY_FAILURE:
+ case RequestSender.GET_OFFER_VERIFY_FAILURE:
throw new
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
case RequestSender.GENERATED_REJECTED_OVERLOAD:
case RequestSender.TIMED_OUT:
@@ -666,7 +672,7 @@
}
}
} finally {
- node.unlockUID(uid, true, false, true);
+ node.unlockUID(uid, true, false, true, false);
}
}
@@ -686,7 +692,7 @@
PartiallyReceivedBlock prb = new
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, data);
CHKInsertSender is;
long uid = random.nextLong();
- if(!node.lockUID(uid, false, true)) {
+ if(!node.lockUID(uid, false, true, false)) {
Logger.error(this, "Could not lock UID just randomly
generated: "+uid+" - probably indicates broken PRNG");
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
@@ -795,7 +801,7 @@
}
}
} finally {
- node.unlockUID(uid, false, true, true);
+ node.unlockUID(uid, false, true, true, false);
}
}
@@ -803,7 +809,7 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
SSKInsertSender is;
long uid = random.nextLong();
- if(!node.lockUID(uid, true, true)) {
+ if(!node.lockUID(uid, true, true, false)) {
Logger.error(this, "Could not lock UID just randomly
generated: "+uid+" - probably indicates broken PRNG");
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
@@ -921,7 +927,7 @@
}
}
} finally {
- node.unlockUID(uid, true, true, true);
+ node.unlockUID(uid, true, true, true, false);
}
}
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -15,6 +15,7 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.Peer;
import freenet.keys.Key;
+import freenet.keys.NodeSSK;
import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.ShortBuffer;
@@ -160,7 +161,9 @@
// return handleProbeTrace(m, source);
} else if(spec == DMT.FNPOfferKey) {
return handleOfferKey(m, source);
- }
+ } else if(spec == DMT.FNPGetOfferedKey) {
+ return handleGetOfferedKey(m, source);
+ }
return false;
}
@@ -171,6 +174,52 @@
return true;
}
+ private boolean handleGetOfferedKey(Message m, PeerNode source) {
+ Key key = (Key) m.getObject(DMT.KEY);
+ byte[] authenticator = ((ShortBuffer)
m.getObject(DMT.OFFER_AUTHENTICATOR)).getData();
+ long uid = m.getLong(DMT.UID);
+ HMAC hash = new HMAC(SHA256.getInstance());
+ if(!hash.verify(node.failureTable.offerAuthenticatorKey,
key.getFullKey(), authenticator)) {
+ Logger.error(this, "Invalid offer from "+source+" :
authenticator did not verify");
+ try {
+
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null, 0, null);
+ } catch (NotConnectedException e) {
+ // Too bad.
+ }
+ return true;
+ }
+ if(logMINOR) Logger.minor(this, "Valid GetOfferedKey for
"+key+" from "+source);
+
+ // Do we want it? We can RejectOverload if we don't have the
bandwidth...
+ boolean isSSK = key instanceof NodeSSK;
+ boolean needPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
+ if(isSSK) {
+
+ }
+ String reject =
+ nodeStats.shouldRejectRequest(true, false, isSSK,
false, true, source);
+ if(reject != null) {
+ Logger.normal(this, "Rejecting FNPGetOfferedKey from
"+source+" for "+key+" : "+reject);
+ Message rejected = DMT.createFNPRejectedOverload(uid,
true);
+ try {
+ source.sendAsync(rejected, null, 0, null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Rejecting (overload) data
request from "+source.getPeer()+": "+e);
+ }
+ node.unlockUID(uid, isSSK, false, false, false);
+ return true;
+ }
+
+ // Accept it.
+
+ try {
+ node.failureTable.sendOfferedKey(key, isSSK,
needPubKey, uid, source);
+ } catch (NotConnectedException e) {
+ // Too bad.
+ }
+ return true;
+ }
+
private void handleDisconnect(final Message m, final PeerNode source) {
// Must run ON the packet sender thread as it sends a packet
directly
node.getTicker().queueTimedJob(new FastRunnable() {
@@ -229,7 +278,7 @@
}
return true;
}
- if(!node.lockUID(id, isSSK, false)) {
+ if(!node.lockUID(id, isSSK, false, false)) {
if(logMINOR) Logger.minor(this, "Could not lock ID
"+id+" -> rejecting (already running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
@@ -241,7 +290,7 @@
} else {
if(logMINOR) Logger.minor(this, "Locked "+id);
}
- String rejectReason = nodeStats.shouldRejectRequest(!isSSK,
false, isSSK, false, source);
+ String rejectReason = nodeStats.shouldRejectRequest(!isSSK,
false, isSSK, false, false, source);
if(rejectReason != null) {
// can accept 1 CHK request every so often, but not
with SSKs because they aren't throttled so won't sort out bwlimitDelayTime,
which was the whole reason for accepting them when overloaded...
Logger.normal(this, "Rejecting request from
"+source.getPeer()+" preemptively because "+rejectReason);
@@ -251,7 +300,7 @@
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data
request from "+source.getPeer()+": "+e);
}
- node.unlockUID(id, isSSK, false, false);
+ node.unlockUID(id, isSSK, false, false, false);
return true;
}
//if(!node.lockUID(id)) return false;
@@ -271,7 +320,7 @@
}
return true;
}
- if(!node.lockUID(id, isSSK, true)) {
+ if(!node.lockUID(id, isSSK, true, false)) {
if(logMINOR) Logger.minor(this, "Could not lock ID
"+id+" -> rejecting (already running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
@@ -282,7 +331,7 @@
return true;
}
// SSKs don't fix bwlimitDelayTime so shouldn't be accepted
when overloaded.
- String rejectReason = nodeStats.shouldRejectRequest(!isSSK,
true, isSSK, false, source);
+ String rejectReason = nodeStats.shouldRejectRequest(!isSSK,
true, isSSK, false, false, source);
if(rejectReason != null) {
Logger.normal(this, "Rejecting insert from
"+source.getPeer()+" preemptively because "+rejectReason);
Message rejected = DMT.createFNPRejectedOverload(id,
true);
@@ -291,7 +340,7 @@
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload)
insert request from "+source.getPeer()+": "+e);
}
- node.unlockUID(id, isSSK, true, false);
+ node.unlockUID(id, isSSK, true, false, false);
return true;
}
long now = System.currentTimeMillis();
Modified: trunk/freenet/src/freenet/node/NodeStats.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeStats.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/NodeStats.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -113,10 +113,14 @@
final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
+ final TimeDecayingRunningAverage
successfulChkOfferReplyBytesSentAverage;
+ final TimeDecayingRunningAverage
successfulSskOfferReplyBytesSentAverage;
final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
final TimeDecayingRunningAverage
successfulChkInsertBytesReceivedAverage;
final TimeDecayingRunningAverage
successfulSskInsertBytesReceivedAverage;
+ final TimeDecayingRunningAverage
successfulChkOfferReplyBytesReceivedAverage;
+ final TimeDecayingRunningAverage
successfulSskOfferReplyBytesReceivedAverage;
final TrivialRunningAverage globalFetchPSuccess;
final TrivialRunningAverage chkFetchPSuccess;
@@ -312,10 +316,14 @@
successfulSskFetchBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
successfulChkInsertBytesSentAverage = new
TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
successfulSskInsertBytesSentAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
+ successfulChkOfferReplyBytesSentAverage = new
TimeDecayingRunningAverage(32768+500, 180000, 0.0, 1024*1024*1024, throttleFS
== null ? null : throttleFS.subset("successfulChkOfferReplyBytesSentAverage"),
node);
+ successfulSskOfferReplyBytesSentAverage = new
TimeDecayingRunningAverage(3072, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("successfulSskOfferReplyBytesSentAverage"),
node);
successfulChkFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0,
1024*1024*1024, throttleFS == null ? null :
throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
successfulSskFetchBytesReceivedAverage = new
TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"),
node);
successfulChkInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
successfulSskInsertBytesReceivedAverage = new
TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024,
throttleFS == null ? null :
throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
+ successfulChkOfferReplyBytesReceivedAverage = new
TimeDecayingRunningAverage(32768+500, 180000, 0.0, 1024*1024*1024, throttleFS
== null ? null :
throttleFS.subset("successfulChkOfferReplyBytesReceivedAverage"), node);
+ successfulSskOfferReplyBytesReceivedAverage = new
TimeDecayingRunningAverage(3072, 180000, 0.0, 1024*1024*1024, throttleFS ==
null ? null : throttleFS.subset("successfulSskOfferReplyBytesReceivedAverage"),
node);
globalFetchPSuccess = new TrivialRunningAverage();
chkFetchPSuccess = new TrivialRunningAverage();
@@ -390,7 +398,7 @@
};
/* return reject reason as string if should reject, otherwise return
null */
- public String shouldRejectRequest(boolean canAcceptAnyway, boolean
isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
+ public String shouldRejectRequest(boolean canAcceptAnyway, boolean
isInsert, boolean isSSK, boolean isLocal, boolean isOfferReply, PeerNode
source) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) dumpByteCostAverages();
@@ -460,6 +468,8 @@
int numSSKRequests = node.getNumSSKRequests() + 1;
int numCHKInserts = node.getNumCHKInserts() + 1;
int numSSKInserts = node.getNumSSKInserts() + 1;
+ int numCHKOfferReplies = node.getNumCHKOfferReplies() + 1;
+ int numSSKOfferReplies = node.getNumSSKOfferReplies() + 1;
if(logMINOR)
Logger.minor(this, "Running (adjusted): CHK fetch
"+numCHKRequests+" SSK fetch "+numSSKRequests+" CHK insert "+numCHKInserts+"
SSK insert "+numSSKInserts);
@@ -612,7 +622,9 @@
" CHK insert
"+successfulChkInsertBytesSentAverage.currentValue()+ '/'
+successfulChkInsertBytesReceivedAverage.currentValue()+
" SSK insert
"+successfulSskInsertBytesSentAverage.currentValue()+ '/'
+successfulSskInsertBytesReceivedAverage.currentValue()+
" CHK fetch
"+successfulChkFetchBytesSentAverage.currentValue()+ '/'
+successfulChkFetchBytesReceivedAverage.currentValue()+
- " SSK fetch
"+successfulSskFetchBytesSentAverage.currentValue()+ '/'
+successfulSskFetchBytesReceivedAverage.currentValue());
+ " SSK fetch
"+successfulSskFetchBytesSentAverage.currentValue()+ '/'
+successfulSskFetchBytesReceivedAverage.currentValue()+
+ " CHK offer reply
"+successfulChkOfferReplyBytesSentAverage.currentValue()+ '/'
+successfulChkOfferReplyBytesReceivedAverage.currentValue()+
+ " SSK offer reply
"+successfulSskOfferReplyBytesSentAverage.currentValue()+ '/'
+successfulSskOfferReplyBytesReceivedAverage.currentValue());
}
@@ -693,10 +705,14 @@
fs.put("SuccessfulSskFetchBytesSentAverage",
successfulSskFetchBytesSentAverage.exportFieldSet(true));
fs.put("SuccessfulChkInsertBytesSentAverage",
successfulChkInsertBytesSentAverage.exportFieldSet(true));
fs.put("SuccessfulSskInsertBytesSentAverage",
successfulSskInsertBytesSentAverage.exportFieldSet(true));
+ fs.put("SuccessfulChkOfferReplyBytesSentAverage",
successfulChkOfferReplyBytesSentAverage.exportFieldSet(true));
+ fs.put("SuccessfulSskOfferReplyBytesSentAverage",
successfulSskOfferReplyBytesSentAverage.exportFieldSet(true));
fs.put("SuccessfulChkFetchBytesReceivedAverage",
successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
fs.put("SuccessfulSskFetchBytesReceivedAverage",
successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
fs.put("SuccessfulChkInsertBytesReceivedAverage",
successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
fs.put("SuccessfulSskInsertBytesReceivedAverage",
successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
+ fs.put("SuccessfulChkOfferReplyBytesReceivedAverage",
successfulChkOfferReplyBytesReceivedAverage.exportFieldSet(true));
+ fs.put("SuccessfulSskOfferReplyBytesReceivedAverage",
successfulSskOfferReplyBytesReceivedAverage.exportFieldSet(true));
//These are not really part of the 'throttling' data, but are
also running averages which should be persisted
fs.put("AverageCacheLocation",
avgCacheLocation.exportFieldSet(true));
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -1116,6 +1116,7 @@
final long now = System.currentTimeMillis();
Logger.normal(this, "Disconnected " + this);
node.usm.onDisconnect(this);
+ node.failureTable.onDisconnect(this);
node.peers.disconnected(this);
boolean ret;
synchronized(this) {
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -84,11 +84,11 @@
} catch (NotConnectedException e) {
Logger.normal(this, "requestor gone, could not start request
handler wait");
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false, false);
+ node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false, false);
+ node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
}
}
@@ -288,6 +288,7 @@
}
return;
case RequestSender.VERIFY_FAILURE:
+ case RequestSender.GET_OFFER_VERIFY_FAILURE:
if(key instanceof NodeCHK) {
if(bt == null) {
// Bug! This is impossible!
@@ -305,6 +306,7 @@
sendTerminal(reject);
return;
case RequestSender.TRANSFER_FAILED:
+ case RequestSender.GET_OFFER_TRANSFER_FAILED:
if(key instanceof NodeCHK) {
if(bt == null) {
// Bug! This is impossible!
@@ -372,7 +374,7 @@
private void unregisterRequestHandlerWithNode() {
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false, false);
+ node.unlockUID(uid, key instanceof NodeSSK, false, false,
false);
}
/**
Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/RequestSender.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -28,6 +28,8 @@
import freenet.keys.NodeSSK;
import freenet.keys.SSKBlock;
import freenet.keys.SSKVerifyException;
+import freenet.node.FailureTable.BlockOffer;
+import freenet.node.FailureTable.OfferList;
import freenet.store.KeyCollisionException;
import freenet.support.Logger;
import freenet.support.ShortBuffer;
@@ -49,6 +51,7 @@
// Constants
static final int ACCEPTED_TIMEOUT = 5000;
+ static final int GET_OFFER_TIMEOUT = 10000;
static final int FETCH_TIMEOUT = 120000;
/** Wait up to this long to get a path folding reply */
static final int OPENNET_TIMEOUT = 120000;
@@ -72,7 +75,10 @@
private byte[] sskData;
private SSKBlock block;
private boolean hasForwarded;
-
+
+ /** If true, only try to fetch the key from nodes which have offered it */
+ private boolean tryOffersOnly;
+
private ArrayList listeners=new ArrayList();
// Terminal status
@@ -89,6 +95,8 @@
static final int GENERATED_REJECTED_OVERLOAD = 7;
static final int INTERNAL_ERROR = 8;
static final int RECENTLY_FAILED = 9;
+ static final int GET_OFFER_VERIFY_FAILURE = 10;
+ static final int GET_OFFER_TRANSFER_FAILED = 11;
private PeerNode successFrom;
static String getStatusString(int status) {
@@ -103,8 +111,12 @@
return "DATA NOT FOUND";
case TRANSFER_FAILED:
return "TRANSFER FAILED";
+ case GET_OFFER_TRANSFER_FAILED:
+ return "GET OFFER TRANSFER FAILED";
case VERIFY_FAILURE:
return "VERIFY FAILURE";
+ case GET_OFFER_VERIFY_FAILURE:
+ return "GET OFFER VERIFY FAILURE";
case TIMED_OUT:
return "TIMED OUT";
case GENERATED_REJECTED_OVERLOAD:
@@ -158,7 +170,7 @@
realRun();
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
- finish(INTERNAL_ERROR, null);
+ finish(INTERNAL_ERROR, null, false);
} finally {
if(logMINOR) Logger.minor(this, "Leaving RequestSender.run()
for "+uid);
node.removeRequestSender(key, origHTL, this);
@@ -171,6 +183,191 @@
pubKey = ((NodeSSK)key).getPubKey();
}
+ // First ask any nodes that have offered the data
+
+ OfferList offers = node.failureTable.getOffers(key);
+
+ while(true) {
+ // Fetches valid offers, then expired ones. Expired offers
don't count towards failures,
+ // but they're still worth trying.
+ BlockOffer offer = offers.getFirstOffer();
+ if(offer == null) break;
+ PeerNode pn = offer.getPeerNode();
+ if(pn == null) {
+ offers.deleteLastOffer();
+ continue;
+ }
+ if(pn.getBootID() != offer.bootID) {
+ offers.deleteLastOffer();
+ continue;
+ }
+ Message msg = DMT.createFNPGetOfferedKey(key,
offer.authenticator, pubKey == null, uid);
+ try {
+ pn.sendAsync(msg, null, 0, this);
+ } catch (NotConnectedException e2) {
+ if(logMINOR)
+ Logger.minor(this, "Disconnected:
"+pn+" getting offer for "+key);
+ offers.deleteLastOffer();
+ continue;
+ }
+ MessageFilter mfRO =
MessageFilter.create().setSource(pn).setField(DMT.UID,
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPRejectedOverload);
+ MessageFilter mfGetInvalid =
MessageFilter.create().setSource(pn).setField(DMT.UID,
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPGetOfferedKeyInvalid);
+ // Wait for a response.
+ if(key instanceof NodeCHK) {
+ // Headers first, then block transfer.
+ MessageFilter mfDF =
MessageFilter.create().setSource(pn).setField(DMT.UID,
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPCHKDataFound);
+ Message reply;
+ try {
+ reply =
node.usm.waitFor(mfDF.or(mfRO.or(mfGetInvalid)), this);
+ } catch (DisconnectedException e2) {
+ if(logMINOR)
+ Logger.minor(this,
"Disconnected: "+pn+" getting offer for "+key);
+ offers.deleteLastOffer();
+ continue;
+ }
+ if(reply == null) {
+ // We gave it a chance, don't give it another.
+ offers.deleteLastOffer();
+ continue;
+ } else if(reply.getSpec() == DMT.FNPRejectedOverload) {
+ // Non-fatal, keep it.
+ if(logMINOR)
+ Logger.minor(this, "Node "+pn+"
rejected FNPGetOfferedKey for "+key+" (expired="+offer.isExpired());
+ offers.keepLastOffer();
+ continue;
+ } else if(reply.getSpec() ==
DMT.FNPGetOfferedKeyInvalid) {
+ // Fatal, delete it.
+ if(logMINOR)
+ Logger.minor(this, "Node "+pn+"
rejected FNPGetOfferedKey as invalid with reason "+reply.getShort(DMT.REASON));
+ offers.deleteLastOffer();
+ continue;
+ } else if(reply.getSpec() == DMT.FNPCHKDataFound) {
+ headers =
((ShortBuffer)reply.getObject(DMT.BLOCK_HEADERS)).getData();
+ // Receive the data
+
+ // FIXME: Validate headers
+
+ node.addTransferringSender((NodeCHK)key, this);
+
+ try {
+
+ prb = new
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE);
+
+ synchronized(this) {
+ notifyAll();
+ }
+ fireCHKTransferBegins();
+
+ BlockReceiver br = new BlockReceiver(node.usm,
pn, uid, prb, this);
+
+ try {
+ if(logMINOR) Logger.minor(this,
"Receiving data");
+ byte[] data = br.receive();
+ if(logMINOR) Logger.minor(this,
"Received data");
+ // Received data
+ try {
+ verifyAndCommit(data);
+ } catch (KeyVerifyException e1) {
+ Logger.normal(this, "Got data
but verify failed: "+e1, e1);
+
finish(GET_OFFER_VERIFY_FAILURE, pn, true);
+ node.failureTable.onFailure(key, htl,
new PeerNode[] { source }, pn, -1, System.currentTimeMillis());
+ offers.deleteLastOffer();
+ return;
+ }
+ finish(SUCCESS, pn, true);
+ return;
+ } catch (RetrievalException e) {
+ if
(e.getReason()==RetrievalException.SENDER_DISCONNECTED)
+
Logger.normal(this, "Transfer failed (disconnect): "+e, e);
+ else
+
Logger.error(this, "Transfer failed
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
+ finish(GET_OFFER_TRANSFER_FAILED, pn,
true);
+ node.failureTable.onFailure(key, htl, new
PeerNode[] { source }, pn, -1, System.currentTimeMillis());
+ offers.deleteLastOffer();
+ return;
+ }
+ } finally {
+ node.removeTransferringSender((NodeCHK)key,
this);
+ }
+ }
+ } else {
+ // Data, possibly followed by pubkey
+ MessageFilter mfDF =
MessageFilter.create().setSource(pn).setField(DMT.UID,
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPSSKDataFound);
+ Message reply;
+ try {
+ reply =
node.usm.waitFor(mfDF.or(mfRO.or(mfGetInvalid)), this);
+ } catch (DisconnectedException e) {
+ if(logMINOR)
+ Logger.minor(this,
"Disconnected: "+pn+" getting offer for "+key);
+ offers.deleteLastOffer();
+ continue;
+ }
+ if(reply == null) {
+ offers.deleteLastOffer();
+ continue;
+ } else if(reply.getSpec() == DMT.FNPRejectedOverload) {
+ // Non-fatal, keep it.
+ if(logMINOR)
+ Logger.minor(this, "Node "+pn+"
rejected FNPGetOfferedKey for "+key+" (expired="+offer.isExpired());
+ offers.keepLastOffer();
+ continue;
+ } else if(reply.getSpec() ==
DMT.FNPGetOfferedKeyInvalid) {
+ // Fatal, delete it.
+ if(logMINOR)
+ Logger.minor(this, "Node "+pn+"
rejected FNPGetOfferedKey as invalid with reason "+reply.getShort(DMT.REASON));
+ offers.deleteLastOffer();
+ continue;
+ } else if(reply.getSpec() == DMT.FNPSSKDataFound) {
+ // Receive the data
+ headers = ((ShortBuffer)
reply.getObject(DMT.BLOCK_HEADERS)).getData();
+ sskData = ((ShortBuffer)
reply.getObject(DMT.DATA)).getData();
+ if(pubKey != null) {
+ MessageFilter mfPK =
MessageFilter.create().setSource(pn).setField(DMT.UID,
uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPSSKPubKey);
+ Message pk;
+ try {
+ pk =
node.usm.waitFor(mfPK, this);
+ } catch (DisconnectedException
e) {
+ if(logMINOR)
+
Logger.minor(this, "Disconnected: "+pn+" getting pubkey for offer for "+key);
+
offers.deleteLastOffer();
+ continue;
+ }
+ if(pk == null) {
+ Logger.error(this, "Got data
but not pubkey from "+pn+" for offer for "+key);
+ offers.deleteLastOffer();
+ continue;
+ }
+ try {
+ pubKey =
DSAPublicKey.create(((ShortBuffer)pk.getObject(DMT.PUBKEY_AS_BYTES)).getData());
+ } catch (CryptFormatException
e) {
+ Logger.error(this,
"Bogus pubkey from "+pn+" for offer for "+key+" : "+e, e);
+ offers.deleteLastOffer();
+ continue;
+ }
+ }
+
+ try {
+
((NodeSSK)key).setPubKey(pubKey);
+ } catch (SSKVerifyException e) {
+ Logger.error(this, "Bogus SSK
data from "+pn+" for offer for "+key+" : "+e, e);
+ offers.deleteLastOffer();
+ continue;
+ }
+
+ if(finishSSKFromGetOffer(pn)) {
+ if(logMINOR) Logger.minor(this,
"Successfully fetched SSK from offer from "+pn+" for "+key);
+ return;
+ } else {
+ offers.deleteLastOffer();
+ continue;
+ }
+ }
+ }
+ // RejectedOverload is possible - but we need to include it in
the statistics.
+ // We don't remove the offer in that case. Otherwise we do,
even if it fails.
+ // FNPGetOfferedKeyInvalid is also possible.
+ }
+
int routeAttempts=0;
int rejectOverloads=0;
HashSet nodesRoutedTo = new HashSet();
@@ -180,7 +377,7 @@
if(htl == 0) {
// This used to be RNF, I dunno why
//???: finish(GENERATED_REJECTED_OVERLOAD,
null);
- finish(DATA_NOT_FOUND, null);
+ finish(DATA_NOT_FOUND, null, false);
node.failureTable.onFailure(key, htl, new PeerNode[] {
source }, null, FailureTable.REJECT_TIME, System.currentTimeMillis());
return;
}
@@ -195,7 +392,7 @@
if (logMINOR && rejectOverloads>0)
Logger.minor(this, "no more peers, but
overloads ("+rejectOverloads+"/"+routeAttempts+" overloaded)");
// Backtrack
- finish(ROUTE_NOT_FOUND, null);
+ finish(ROUTE_NOT_FOUND, null, false);
node.failureTable.onFailure(key, htl, new PeerNode[] {
source }, null, -1, System.currentTimeMillis());
return;
}
@@ -347,7 +544,7 @@
// Fatal timeout
next.localRejectedOverload("FatalTimeout");
forwardRejectedOverload();
- finish(TIMED_OUT, next);
+ finish(TIMED_OUT, next, false);
node.failureTable.onFailure(key, htl, new PeerNode[] {
source }, next, -1, System.currentTimeMillis());
return;
}
@@ -358,7 +555,7 @@
if(msg.getSpec() == DMT.FNPDataNotFound) {
next.successNotOverload();
- finish(DATA_NOT_FOUND, next);
+ finish(DATA_NOT_FOUND, next, false);
node.failureTable.onFailure(key, htl, new PeerNode[] {
source }, next, FailureTable.REJECT_TIME, System.currentTimeMillis());
return;
}
@@ -420,7 +617,7 @@
// Kill the request, regardless of whether
there is timeout left.
// If there is, we will avoid sending requests for the
specified period.
// FIXME we need to create the FT entry.
- finish(RECENTLY_FAILED, next);
+ finish(RECENTLY_FAILED, next, false);
node.failureTable.onFailure(key, htl, new PeerNode[] {
source }, next, timeLeft, System.currentTimeMillis());
return;
}
@@ -486,18 +683,18 @@
verifyAndCommit(data);
} catch (KeyVerifyException e1) {
Logger.normal(this, "Got data
but verify failed: "+e1, e1);
- finish(VERIFY_FAILURE, next);
+ finish(VERIFY_FAILURE, next,
false);
node.failureTable.onFailure(key, htl,
new PeerNode[] { source }, next, -1, System.currentTimeMillis());
return;
}
- finish(SUCCESS, next);
+ finish(SUCCESS, next, false);
return;
} catch (RetrievalException e) {
if
(e.getReason()==RetrievalException.SENDER_DISCONNECTED)
Logger.normal(this, "Transfer failed (disconnect): "+e, e);
else
Logger.error(this, "Transfer failed
("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
- finish(TRANSFER_FAILED, next);
+ finish(TRANSFER_FAILED, next, false);
node.failureTable.onFailure(key, htl, new
PeerNode[] { source }, next, -1, System.currentTimeMillis());
return;
}
@@ -570,18 +767,41 @@
node.storeShallow(block);
if(node.random.nextInt(RANDOM_REINSERT_INTERVAL) == 0)
node.queueRandomReinsert(block);
- finish(SUCCESS, next);
+ finish(SUCCESS, next, false);
} catch (SSKVerifyException e) {
Logger.error(this, "Failed to verify: "+e+" from
"+next, e);
- finish(VERIFY_FAILURE, next);
+ finish(VERIFY_FAILURE, next, false);
return;
} catch (KeyCollisionException e) {
Logger.normal(this, "Collision on "+this);
- finish(SUCCESS, next);
+ finish(SUCCESS, next, false);
}
}
/**
+ * Finish fetching an SSK. We must have received the data, the headers and
the pubkey by this point.
+ * @param next The node we received the data from.
+ * @return True if the request has completed. False if we need to look
elsewhere.
+ */
+ private boolean finishSSKFromGetOffer(PeerNode next) {
+ try {
+ block = new SSKBlock(sskData, headers, (NodeSSK)key,
false);
+ node.storeShallow(block);
+ if(node.random.nextInt(RANDOM_REINSERT_INTERVAL) == 0)
+ node.queueRandomReinsert(block);
+ finish(SUCCESS, next, true);
+ return true;
+ } catch (SSKVerifyException e) {
+ Logger.error(this, "Failed to verify (from get offer):
"+e+" from "+next, e);
+ return false;
+ } catch (KeyCollisionException e) {
+ Logger.normal(this, "Collision (from get offer) on
"+this);
+ finish(SUCCESS, next, true);
+ return false;
+ }
+ }
+
+ /**
* Note that this must be first on the list.
*/
private MessageFilter makeDataFoundFilter(PeerNode next) {
@@ -696,7 +916,7 @@
}
}
- private void finish(int code, PeerNode next) {
+ private void finish(int code, PeerNode next, boolean fromOfferedKey) {
if(logMINOR) Logger.minor(this, "finish("+code+ ')');
synchronized(this) {
@@ -710,18 +930,21 @@
if(next != null) {
next.onSuccess(false, key instanceof NodeSSK);
}
- node.nodeStats.requestCompleted(true, source != null, key
instanceof NodeSSK);
+ // FIXME should this be called when fromOfferedKey??
+ node.nodeStats.requestCompleted(true, source != null,
key instanceof NodeSSK);
//NOTE: because of the requesthandler implementation,
this will block and wait
// for downstream transfers on a CHK. The opennet
stuff introduces
// a delay of it's own if we don't get the
expected message.
fireRequestSenderFinished(code);
- if(key instanceof NodeCHK && next != null &&
- (next.isOpennet() ||
node.passOpennetRefsThroughDarknet()) ) {
- finishOpennet(next);
- } else
- finishOpennetNull(next);
+ if(fromOfferedKey) {
+ if(key instanceof NodeCHK && next != null &&
+ (next.isOpennet() ||
node.passOpennetRefsThroughDarknet()) ) {
+ finishOpennet(next);
+ } else
+ finishOpennetNull(next);
+ }
} else {
node.nodeStats.requestCompleted(false, source != null, key
instanceof NodeSSK);
fireRequestSenderFinished(code);
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2008-02-02 17:34:22 UTC
(rev 17478)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2008-02-02 20:05:30 UTC
(rev 17479)
@@ -117,7 +117,7 @@
} while(now < sleepUntil);
String reason;
if(LOCAL_REQUESTS_COMPETE_FAIRLY) {
- if((reason =
stats.shouldRejectRequest(true, isInsert, isSSK, true, null)) != null) {
+ if((reason =
stats.shouldRejectRequest(true, isInsert, isSSK, true, false, null)) != null) {
if(logMINOR)
Logger.minor(this, "Not
sending local request: "+reason);
// Wait one throttle-delay
before trying again
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-02-02
17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-02-02
20:05:30 UTC (rev 17479)
@@ -84,7 +84,7 @@
Logger.error(this, "Caught "+t, t);
} finally {
if(logMINOR) Logger.minor(this, "Exiting InsertHandler.run() for
"+uid);
- node.unlockUID(uid, true, true, false);
+ node.unlockUID(uid, true, true, false, false);
}
}