Author: toad
Date: 2006-01-06 21:18:26 +0000 (Fri, 06 Jan 2006)
New Revision: 7770
Added:
trunk/freenet/src/freenet/node/AnyInsertSender.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
Removed:
trunk/freenet/src/freenet/node/InsertSender.java
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/keys/CHKBlock.java
trunk/freenet/src/freenet/keys/ClientCHKBlock.java
trunk/freenet/src/freenet/keys/KeyBlock.java
trunk/freenet/src/freenet/keys/SSKBlock.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/store/BaseFreenetStore.java
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
324:
Implement SSKInsertSender.
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2006-01-06 19:26:42 UTC (rev
7769)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2006-01-06 21:18:26 UTC (rev
7770)
@@ -23,6 +23,7 @@
import java.util.List;
import freenet.keys.Key;
+import freenet.keys.NodeSSK;
import freenet.support.BitArray;
import freenet.support.Buffer;
import freenet.support.ShortBuffer;
@@ -80,6 +81,9 @@
public static final String STREAM_SEQNO = "streamSequenceNumber";
public static final String IS_LOCAL = "isLocal";
public static final String ANY_TIMED_OUT = "anyTimedOut";
+ public static final String PUBKEY_HASH = "pubkeyHash";
+ public static final String NEED_PUB_KEY = "needPubKey";
+ public static final String PUBKEY_AS_BYTES = "pubkeyAsBytes";
//Diagnostic
public static final MessageType ping = new MessageType("ping") {{
@@ -666,7 +670,83 @@
return "Receive failed";
return "Unknown reason code: "+reason;
}
+
+ public static final MessageType FNPSSKInsertRequest = new
MessageType("FNPSSKInsertRequest") {{
+ addField(UID, Long.class);
+ addField(HTL, Short.class);
+ addField(KEY, NodeSSK.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(BLOCK_HEADERS, ShortBuffer.class);
+ addField(PUBKEY_HASH, ShortBuffer.class);
+ }};
+ public static Message createFNPSSKInsertRequest(long uid, short htl,
NodeSSK myKey, double closestLocation, byte[] headers, byte[] pubKeyHash) {
+ Message msg = new Message(FNPSSKInsertRequest);
+ msg.set(UID, uid);
+ msg.set(HTL, htl);
+ msg.set(KEY, myKey);
+ msg.set(NEAREST_LOCATION, closestLocation);
+ msg.set(BLOCK_HEADERS, new ShortBuffer(headers));
+ msg.set(PUBKEY_HASH, new ShortBuffer(pubKeyHash));
+ return msg;
+ }
+
+ public static final MessageType FNPSSKDataFound = new
MessageType("FNPSSKDataFound") {{
+ addField(UID, Long.class);
+ addField(HTL, Short.class);
+ addField(KEY, NodeSSK.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(BLOCK_HEADERS, ShortBuffer.class);
+ addField(PUBKEY_HASH, ShortBuffer.class);
+ }};
+
+ public static Message createFNPSSKDataFound(long uid, short htl,
NodeSSK myKey, double closestLocation, byte[] headers, byte[] pubKeyHash) {
+ Message msg = new Message(FNPSSKDataFound);
+ msg.set(UID, uid);
+ msg.set(HTL, htl);
+ msg.set(KEY, myKey);
+ msg.set(NEAREST_LOCATION, closestLocation);
+ msg.set(BLOCK_HEADERS, new ShortBuffer(headers));
+ msg.set(PUBKEY_HASH, new ShortBuffer(pubKeyHash));
+ return msg;
+ }
+
+
+
+ public static MessageType FNPSSKAccepted = new
MessageType("FNPSSKAccepted") {{
+ addField(UID, Long.class);
+ addField(NEED_PUB_KEY, Boolean.class);
+ }};
+
+ public static final Message createFNPSSKAccepted(long uid, boolean
needPubKey) {
+ Message msg = new Message(FNPSSKAccepted);
+ msg.set(UID, uid);
+ msg.set(NEED_PUB_KEY, needPubKey);
+ return msg;
+ }
+
+ public static MessageType FNPSSKPubKey = new
MessageType("FNPSSKPubKey") {{
+ addField(UID, Long.class);
+ addField(PUBKEY_AS_BYTES, ShortBuffer.class);
+ }};
+
+ public static Message createFNPSSKPubKey(long uid, byte[] pubkey) {
+ Message msg = new Message(FNPSSKPubKey);
+ msg.set(UID, uid);
+ msg.set(PUBKEY_AS_BYTES, new ShortBuffer(pubkey));
+ return msg;
+ }
+
+ public static MessageType FNPSSKPubKeyAccepted = new
MessageType("FNPSSKPubKeyAccepted") {{
+ addField(UID, Long.class);
+ }};
+
+ public static Message createFNPSSKPubKeyAccepted(long uid) {
+ Message msg = new Message(FNPSSKPubKeyAccepted);
+ msg.set(UID, uid);
+ return msg;
+ }
+
public static final MessageType FNPPing = new MessageType("FNPPing") {{
addField(PING_SEQNO, Integer.class);
}};
@@ -812,7 +892,7 @@
msg.set(HTL, htl);
return msg;
}
-
+
public static void init() { }
}
Modified: trunk/freenet/src/freenet/keys/CHKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/CHKBlock.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/keys/CHKBlock.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -1,26 +1,8 @@
package freenet.keys;
-import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import freenet.crypt.BlockCipher;
-import freenet.crypt.PCFBMode;
-import freenet.crypt.UnsupportedCipherException;
-import freenet.crypt.ciphers.Rijndael;
-import freenet.node.Node;
-import freenet.support.ArrayBucket;
-import freenet.support.ArrayBucketFactory;
-import freenet.support.Bucket;
-import freenet.support.BucketFactory;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.SimpleReadOnlyArrayBucket;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-import freenet.support.compress.DecompressException;
-
/**
* @author amphibian
*
@@ -30,7 +12,7 @@
public class CHKBlock implements KeyBlock {
final byte[] data;
- final byte[] header;
+ final byte[] headers;
final short hashIdentifier;
final NodeCHK chk;
public static final int MAX_LENGTH_BEFORE_COMPRESSION = Integer.MAX_VALUE;
@@ -43,8 +25,8 @@
/**
* @return The header for this key. DO NOT MODIFY THIS DATA!
*/
- public byte[] getHeader() {
- return header;
+ public byte[] getHeaders() {
+ return headers;
}
/**
@@ -60,10 +42,10 @@
public CHKBlock(byte[] data2, byte[] header2, NodeCHK key, boolean verify)
throws CHKVerifyException {
data = data2;
- header = header2;
- if(header.length != TOTAL_HEADERS_LENGTH)
- throw new IllegalArgumentException("Wrong length:
"+header.length+" should be "+TOTAL_HEADERS_LENGTH);
- hashIdentifier = (short)(((header[0] & 0xff) << 8) + (header[1] &
0xff));
+ headers = header2;
+ if(headers.length != TOTAL_HEADERS_LENGTH)
+ throw new IllegalArgumentException("Wrong length:
"+headers.length+" should be "+TOTAL_HEADERS_LENGTH);
+ hashIdentifier = (short)(((headers[0] & 0xff) << 8) + (headers[1] &
0xff));
this.chk = key;
// Logger.debug(CHKBlock.class, "Data length: "+data.length+", header
length: "+header.length);
if(!verify) return;
@@ -79,7 +61,7 @@
throw new Error(e);
}
- md.update(header);
+ md.update(headers);
md.update(data);
byte[] hash = md.digest();
byte[] check = chk.routingKey;
@@ -92,4 +74,12 @@
public Key getKey() {
return chk;
}
+
+ public byte[] getRawHeaders() {
+ return headers;
+ }
+
+ public byte[] getRawData() {
+ return data;
+ }
}
Modified: trunk/freenet/src/freenet/keys/ClientCHKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/ClientCHKBlock.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/keys/ClientCHKBlock.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -55,7 +55,7 @@
* Construct from a CHKBlock and a key.
*/
public ClientCHKBlock(CHKBlock block, ClientCHK key2) throws
CHKVerifyException {
- this(block.getData(), block.getHeader(), key2, true);
+ this(block.getData(), block.getHeaders(), key2, true);
}
/**
@@ -98,8 +98,8 @@
throw new CHKDecodeException("Crypto key too short");
cipher.initialize(key.cryptoKey);
PCFBMode pcfb = new PCFBMode(cipher);
- byte[] hbuf = new byte[header.length-2];
- System.arraycopy(header, 2, hbuf, 0, header.length-2);
+ byte[] hbuf = new byte[headers.length-2];
+ System.arraycopy(headers, 2, hbuf, 0, headers.length-2);
byte[] dbuf = new byte[data.length];
System.arraycopy(data, 0, dbuf, 0, data.length);
// Decipher header first - functions as IV
Modified: trunk/freenet/src/freenet/keys/KeyBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/KeyBlock.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/keys/KeyBlock.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -1,10 +1,5 @@
package freenet.keys;
-import java.io.IOException;
-
-import freenet.support.Bucket;
-import freenet.support.BucketFactory;
-
/**
* Interface for fetched blocks. Can be decoded with a key.
*/
@@ -12,4 +7,7 @@
final static int HASH_SHA256 = 1;
+ public Key getKey();
+ public byte[] getRawHeaders();
+ public byte[] getRawData();
}
Modified: trunk/freenet/src/freenet/keys/SSKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/SSKBlock.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/keys/SSKBlock.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -109,4 +109,16 @@
throw new SSKVerifyException("E(H(docname)) wrong -
wrong key??");
}
+ public Key getKey() {
+ return nodeKey;
+ }
+
+ public byte[] getRawHeaders() {
+ return headers;
+ }
+
+ public byte[] getRawData() {
+ return data;
+ }
+
}
Copied: trunk/freenet/src/freenet/node/AnyInsertSender.java (from rev 7767,
trunk/freenet/src/freenet/node/InsertSender.java)
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java 2006-01-06 12:27:49 UTC
(rev 7767)
+++ trunk/freenet/src/freenet/node/AnyInsertSender.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -0,0 +1,16 @@
+package freenet.node;
+
+public interface AnyInsertSender {
+
+ public abstract int getStatus();
+
+ public abstract short getHTL();
+
+ /**
+ * @return The current status as a string
+ */
+ public abstract String getStatusString();
+
+ public abstract boolean sentRequest();
+
+}
\ No newline at end of file
Copied: trunk/freenet/src/freenet/node/CHKInsertSender.java (from rev 7767,
trunk/freenet/src/freenet/node/InsertSender.java)
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java 2006-01-06 12:27:49 UTC
(rev 7767)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -0,0 +1,799 @@
+package freenet.node;
+
+import java.util.HashSet;
+import java.util.Vector;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.AbortedException;
+import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.keys.CHKBlock;
+import freenet.keys.CHKVerifyException;
+import freenet.keys.NodeCHK;
+import freenet.support.Logger;
+
+public final class CHKInsertSender implements Runnable, AnyInsertSender {
+
+ private class AwaitingCompletion {
+
+ /** Node we are waiting for response from */
+ final PeerNode pn;
+ /** We may be sending data to that node */
+ BlockTransmitter bt;
+ /** Have we received notice of the downstream success
+ * or failure of dependant transfers from that node?
+ * Includes timing out. */
+ boolean receivedCompletionNotice = false;
+ /** Timed out - didn't receive completion notice in
+ * the allotted time?? */
+ boolean completionTimedOut = false;
+ /** Was the notification of successful transfer? */
+ boolean completionSucceeded;
+
+ /** Have we completed the immediate transfer? */
+ boolean completedTransfer = false;
+ /** Did it succeed? */
+ boolean transferSucceeded = false;
+
+ AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
+ this.pn = pn;
+ bt = new BlockTransmitter(node.usm, pn, uid, prb);
+ Sender s = new Sender(this);
+ Thread senderThread = new Thread(s, "Sender for "+uid+" to
"+pn.getPeer());
+ senderThread.setDaemon(true);
+ senderThread.start();
+ }
+
+ void completed(boolean timeout, boolean success) {
+ synchronized(this) {
+ if(timeout)
+ completionTimedOut = true;
+ else
+ completionSucceeded = success;
+ receivedCompletionNotice = true;
+ notifyAll();
+ }
+ synchronized(nodesWaitingForCompletion) {
+ nodesWaitingForCompletion.notifyAll();
+ }
+ if(!success) {
+ synchronized(CHKInsertSender.this) {
+ transferTimedOut = true;
+ CHKInsertSender.this.notifyAll();
+ }
+ }
+ }
+
+ void completedTransfer(boolean success) {
+ synchronized(this) {
+ transferSucceeded = success;
+ completedTransfer = true;
+ notifyAll();
+ }
+ synchronized(nodesWaitingForCompletion) {
+ nodesWaitingForCompletion.notifyAll();
+ }
+ if(!success) {
+ synchronized(CHKInsertSender.this) {
+ transferTimedOut = true;
+ CHKInsertSender.this.notifyAll();
+ }
+ }
+ }
+ }
+
+ public class Sender implements Runnable {
+
+ final AwaitingCompletion completion;
+ final BlockTransmitter bt;
+
+ public Sender(AwaitingCompletion ac) {
+ this.bt = ac.bt;
+ this.completion = ac;
+ }
+
+ public void run() {
+ try {
+ bt.send();
+ if(bt.failedDueToOverload()) {
+ completion.completedTransfer(false);
+ } else {
+ completion.completedTransfer(true);
+ }
+ } catch (Throwable t) {
+ completion.completedTransfer(false);
+ Logger.error(this, "Caught "+t, t);
+ }
+ }
+ }
+
+ CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
+ PeerNode source, Node node, PartiallyReceivedBlock prb, boolean
fromStore, double closestLocation) {
+ this.myKey = myKey;
+ this.target = myKey.toNormalizedDouble();
+ this.uid = uid;
+ this.headers = headers;
+ this.htl = htl;
+ this.source = source;
+ this.node = node;
+ this.prb = prb;
+ this.fromStore = fromStore;
+ this.closestLocation = closestLocation;
+ this.startTime = System.currentTimeMillis();
+ this.nodesWaitingForCompletion = new Vector();
+ Thread t = new Thread(this, "CHKInsertSender for UID "+uid+" on
"+node.portNumber+" at "+System.currentTimeMillis());
+ t.setDaemon(true);
+ t.start();
+ }
+
+ // Constants
+ static final int ACCEPTED_TIMEOUT = 10000;
+ static final int SEARCH_TIMEOUT = 60000;
+ static final int TRANSFER_COMPLETION_TIMEOUT = 120000;
+
+ // Basics
+ final NodeCHK myKey;
+ final double target;
+ final long uid;
+ short htl;
+ final PeerNode source;
+ final Node node;
+ final byte[] headers; // received BEFORE creation => we handle Accepted
elsewhere
+ final PartiallyReceivedBlock prb;
+ final boolean fromStore;
+ private boolean receiveFailed = false;
+ final double closestLocation;
+ final long startTime;
+ private boolean sentRequest;
+
+ /** List of nodes we are waiting for either a transfer completion
+ * notice or a transfer completion from. */
+ private Vector nodesWaitingForCompletion;
+
+ /** Have all transfers completed and all nodes reported completion status?
*/
+ private boolean allTransfersCompleted = false;
+
+ /** Has a transfer timed out, either directly or downstream? */
+ private boolean transferTimedOut = false;
+
+ /** Runnable which waits for completion of all transfers */
+ private CompletionWaiter cw = null;
+
+ /** Time at which we set status to a value other than NOT_FINISHED */
+ private long setStatusTime = -1;
+
+
+ private int status = -1;
+ /** Still running */
+ static final int NOT_FINISHED = -1;
+ /** Successful insert */
+ static final int SUCCESS = 0;
+ /** Route not found */
+ static final int ROUTE_NOT_FOUND = 1;
+ /** Internal error */
+ static final int INTERNAL_ERROR = 3;
+ /** Timed out waiting for response */
+ static final int TIMED_OUT = 4;
+ /** Locally Generated a RejectedOverload */
+ static final int GENERATED_REJECTED_OVERLOAD = 5;
+ /** Could not get off the node at all! */
+ static final int ROUTE_REALLY_NOT_FOUND = 6;
+
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
+
+ public void run() {
+ short origHTL = htl;
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ if(status == NOT_FINISHED)
+ finish(INTERNAL_ERROR, null);
+ } finally {
+ node.completed(uid);
+ node.removeInsertSender(myKey, origHTL, this);
+ }
+ }
+
+ private void realRun() {
+ HashSet nodesRoutedTo = new HashSet();
+ HashSet nodesNotIgnored = new HashSet();
+
+ while(true) {
+ if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
+
+ if(htl == 0) {
+ // Send an InsertReply back
+ finish(SUCCESS, null);
+ return;
+ }
+
+ // Route it
+ PeerNode next;
+ // Can backtrack, so only route to nodes closer than we are to
target.
+ double nextValue;
+ synchronized(node.peers) {
+ next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true);
+ if(next != null)
+ nextValue = next.getLocation().getValue();
+ else
+ nextValue = -1.0;
+ }
+
+ if(next == null) {
+ // Backtrack
+ finish(ROUTE_NOT_FOUND, null);
+ return;
+ }
+ Logger.minor(this, "Routing insert to "+next);
+ nodesRoutedTo.add(next);
+
+ if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
+ Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
+ htl = node.decrementHTL(source, htl);
+ }
+
+ Message req = DMT.createFNPInsertRequest(uid, htl, myKey,
closestLocation);
+
+ // Wait for ack or reject... will come before even a locally
generated DataReply
+
+ MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+ MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+
+ // mfRejectedOverload must be the last thing in the or
+ // So its or pointer remains null
+ // Otherwise we need to recreate it below
+ mfRejectedOverload.clearOr();
+ MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
+
+ // Send to next node
+
+ try {
+ next.send(req);
+ } catch (NotConnectedException e1) {
+ Logger.minor(this, "Not connected to "+next);
+ continue;
+ }
+ sentRequest = true;
+
+ if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
+ Message msg = null;
+
+ /*
+ * Because messages may be re-ordered, it is
+ * entirely possible that we get a non-local RejectedOverload,
+ * followed by an Accepted. So we must loop here.
+ */
+
+ while (true) {
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from
" + next
+ + " while waiting for
Accepted");
+ break;
+ }
+
+ if (receiveFailed)
+ return; // don't need to set status as
killed by InsertHandler
+
+ if (msg == null) {
+ // Terminal overload
+ // Try to propagate back to source
+ Logger.minor(this, "Timeout");
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Non-fatal - probably still have time
left
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger.minor(this,
+
"Local RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ } else {
+ forwardRejectedOverload();
+ }
+ continue;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedLoop) {
+ next.successNotOverload();
+ // Loop - we don't want to send the
data to this one
+ break;
+ }
+
+ if (msg.getSpec() != DMT.FNPAccepted) {
+ Logger.error(this,
+ "Unexpected message
waiting for Accepted: "
+ + msg);
+ break;
+ }
+ // Otherwise is an FNPAccepted
+ break;
+ }
+
+ if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;
+
+ Logger.minor(this, "Got Accepted on "+this);
+
+ // Send them the data.
+ // Which might be the new data resulting from a collision...
+
+ Message dataInsert;
+ PartiallyReceivedBlock prbNow;
+ prbNow = prb;
+ dataInsert = DMT.createFNPDataInsert(uid, headers);
+ /** What are we waiting for now??:
+ * - FNPRouteNotFound - couldn't exhaust HTL, but send us the
+ * data anyway please
+ * - FNPInsertReply - used up all HTL, yay
+ * - FNPRejectOverload - propagating an overload error :(
+ * - FNPRejectTimeout - we took too long to send the DataInsert
+ * - FNPDataInsertRejected - the insert was invalid
+ */
+
+ MessageFilter mfInsertReply =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPInsertReply);
+ mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
+ mfRejectedOverload.clearOr();
+ MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
+ MessageFilter mfDataInsertRejected =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPDataInsertRejected);
+ MessageFilter mfTimeout =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRejectedTimeout);
+
+ mf =
mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload))));
+
+ Logger.minor(this, "Sending DataInsert");
+ if(receiveFailed) return;
+ try {
+ next.send(dataInsert);
+ } catch (NotConnectedException e1) {
+ Logger.minor(this, "Not connected sending
DataInsert: "+next+" for "+uid);
+ continue;
+ }
+
+ Logger.minor(this, "Sending data");
+ if(receiveFailed) return;
+ AwaitingCompletion ac = new AwaitingCompletion(next, prbNow);
+ synchronized(nodesWaitingForCompletion) {
+ nodesWaitingForCompletion.add(ac);
+ nodesWaitingForCompletion.notifyAll();
+ }
+ makeCompletionWaiter();
+
+ while (true) {
+
+ if (receiveFailed)
+ return;
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from
" + next
+ + " while waiting for
InsertReply on " + this);
+ break;
+ }
+ if (receiveFailed)
+ return;
+
+ if (msg == null || msg.getSpec() ==
DMT.FNPRejectedTimeout) {
+ // Timeout :(
+ // Fairly serious problem
+ Logger.error(this, "Timeout (" + msg
+ + ") after Accepted in
insert");
+ // Terminal overload
+ // Try to propagate back to source
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Probably non-fatal, if so, we have
time left, can try next one
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger.minor(this,
+ "Local
RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ } else {
+ forwardRejectedOverload();
+ }
+ continue; // Wait for any further
response
+ }
+
+ if (msg.getSpec() == DMT.FNPRouteNotFound) {
+ Logger.minor(this, "Rejected: RNF");
+ short newHtl = msg.getShort(DMT.HTL);
+ if (htl > newHtl)
+ htl = newHtl;
+ // Finished as far as this node is
concerned
+ next.successNotOverload();
+ break;
+ }
+
+ if (msg.getSpec() == DMT.FNPDataInsertRejected)
{
+ next.successNotOverload();
+ short reason = msg
+
.getShort(DMT.DATA_INSERT_REJECTED_REASON);
+ Logger.minor(this, "DataInsertRejected:
" + reason);
+ if (reason ==
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
+ if (fromStore) {
+ // That's odd...
+
Logger.error(this,"Verify failed on next node "
+ + next
+ " for DataInsert but we were sending from the store!");
+ } else {
+ try {
+ if
(!prb.allReceived())
+
Logger.error(this,
+
"Did not receive all packets but next node says invalid anyway!");
+ else {
+ //
Check the data
+ new
CHKBlock(prb.getBlock(), headers,
+
myKey);
+
Logger.error(this,
+
"Verify failed on " + next
+
+ " but data was valid!");
+ }
+ } catch
(CHKVerifyException e) {
+ Logger
+
.normal(this,
+
"Verify failed because data was invalid");
+ } catch
(AbortedException e) {
+ receiveFailed =
true;
+ }
+ }
+ break; // What else can we do?
+ } else if (reason ==
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
+ if (receiveFailed) {
+ Logger.minor(this,
"Failed to receive data, so failed to send data");
+ } else {
+ try {
+ if
(prb.allReceived()) {
+
Logger.error(this, "Received all data but send failed to " + next);
+ } else {
+ if
(prb.isAborted()) {
+
Logger.normal(this, "Send failed: aborted: " + prb.getAbortReason() + ": " +
prb.getAbortDescription());
+ } else
+
Logger.normal(this, "Send failed; have not yet received all data but not
aborted: " + next);
+ }
+ } catch
(AbortedException e) {
+ receiveFailed =
true;
+ }
+ }
+ }
+ Logger.error(this, "DataInsert
rejected! Reason="
+ +
DMT.getDataInsertRejectedReason(reason));
+ break;
+ }
+
+ if (msg.getSpec() != DMT.FNPInsertReply) {
+ Logger.error(this, "Unknown reply: " +
msg);
+ finish(INTERNAL_ERROR, next);
+ }
+
+ // Our task is complete
+ next.successNotOverload();
+ finish(SUCCESS, next);
+ return;
+ }
+ }
+ }
+
+ private boolean hasForwardedRejectedOverload = false;
+
+ synchronized boolean receivedRejectedOverload() {
+ return hasForwardedRejectedOverload;
+ }
+
+ /** Forward RejectedOverload to the request originator.
+ * DO NOT CALL if have a *local* RejectedOverload.
+ */
+ private synchronized void forwardRejectedOverload() {
+ if(hasForwardedRejectedOverload) return;
+ hasForwardedRejectedOverload = true;
+ notifyAll();
+ }
+
+ private void finish(int code, PeerNode next) {
+ Logger.minor(this, "Finished: "+code+" on "+this, new
Exception("debug"));
+ if(status != NOT_FINISHED)
+ throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
+
+ setStatusTime = System.currentTimeMillis();
+
+ if(code == ROUTE_NOT_FOUND && !sentRequest)
+ code = ROUTE_REALLY_NOT_FOUND;
+
+ status = code;
+
+ synchronized(this) {
+ notifyAll();
+ }
+
+ Logger.minor(this, "Set status code: "+getStatusString()+" on "+uid);
+
+ // Now wait for transfers, or for downstream transfer notifications.
+
+ synchronized(this) {
+ if(cw != null) {
+ while(!allTransfersCompleted) {
+ try {
+ wait(10*1000);
+ } catch (InterruptedException e) {
+ // Try again
+ }
+ }
+ } else {
+ // There weren't any transfers
+ allTransfersCompleted = true;
+ }
+ notifyAll();
+ }
+
+ Logger.minor(this, "Returning from finish()");
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public short getHTL() {
+ return htl;
+ }
+
+ /**
+ * Called by InsertHandler to notify that the receive has
+ * failed.
+ */
+ public void receiveFailed() {
+ receiveFailed = true;
+ }
+
+ /**
+ * @return The current status as a string
+ */
+ public String getStatusString() {
+ if(status == SUCCESS)
+ return "SUCCESS";
+ if(status == ROUTE_NOT_FOUND)
+ return "ROUTE NOT FOUND";
+ if(status == NOT_FINISHED)
+ return "NOT FINISHED";
+ if(status == INTERNAL_ERROR)
+ return "INTERNAL ERROR";
+ if(status == TIMED_OUT)
+ return "TIMED OUT";
+ if(status == GENERATED_REJECTED_OVERLOAD)
+ return "GENERATED REJECTED OVERLOAD";
+ if(status == ROUTE_REALLY_NOT_FOUND)
+ return "ROUTE REALLY NOT FOUND";
+ return "UNKNOWN STATUS CODE: "+status;
+ }
+
+ public boolean sentRequest() {
+ return sentRequest;
+ }
+
+ private synchronized void makeCompletionWaiter() {
+ if(cw == null) {
+ cw = new CompletionWaiter();
+ Thread t = new Thread(cw, "Completion waiter for "+uid);
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
+ private class CompletionWaiter implements Runnable {
+
+ public void run() {
+ Logger.minor(this, "Starting "+this);
+outer: while(true) {
+ AwaitingCompletion[] waiters;
+ synchronized(nodesWaitingForCompletion) {
+ waiters = new
AwaitingCompletion[nodesWaitingForCompletion.size()];
+ waiters = (AwaitingCompletion[])
nodesWaitingForCompletion.toArray(waiters);
+ }
+
+ // First calculate the timeout
+
+ int timeout;
+ boolean noTimeLeft = false;
+
+ long now = System.currentTimeMillis();
+ if(status == NOT_FINISHED) {
+ // Wait 5 seconds, then try again
+ timeout = 5000;
+ } else {
+ // Completed, wait for everything
+ timeout = (int)Math.min(Integer.MAX_VALUE,
(setStatusTime + TRANSFER_COMPLETION_TIMEOUT) - now);
+ }
+ if(timeout <= 0) {
+ noTimeLeft = true;
+ timeout = 1;
+ }
+
+ MessageFilter mf = null;
+ for(int i=0;i<waiters.length;i++) {
+ AwaitingCompletion awc = waiters[i];
+ if(!awc.pn.isConnected()) {
+ Logger.normal(this, "Disconnected:
"+awc.pn+" in "+CHKInsertSender.this);
+ continue;
+ }
+ if(!awc.receivedCompletionNotice) {
+ MessageFilter m =
+
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
+ if(mf == null)
+ mf = m;
+ else
+ mf = m.or(mf);
+ Logger.minor(this, "Waiting for
"+awc.pn.getPeer());
+ }
+ }
+
+ if(mf == null) {
+ if(status != NOT_FINISHED) {
+ if(nodesWaitingForCompletion.size() !=
waiters.length) {
+ // Added another one
+ Logger.minor(this, "Looping
(mf==null): waiters="+waiters.length+" but
waiting="+nodesWaitingForCompletion.size());
+ continue;
+ }
+ if(waitForCompletedTransfers(waiters,
timeout, noTimeLeft)) {
+
synchronized(CHKInsertSender.this) {
+ allTransfersCompleted =
true;
+
CHKInsertSender.this.notifyAll();
+ }
+ return;
+ }
+ if(noTimeLeft) {
+ for(int
i=0;i<waiters.length;i++) {
+
if(!waiters[i].pn.isConnected()) continue;
+
if(!waiters[i].completedTransfer) {
+
waiters[i].completedTransfer(false);
+ }
+ }
+
synchronized(CHKInsertSender.this) {
+ allTransfersCompleted =
true;
+
CHKInsertSender.this.notifyAll();
+ }
+ return;
+ }
+ // Otherwise, not finished, go back
around loop
+ continue;
+ } else {
+ // Still waiting for request
completion, so more may be added
+ synchronized(nodesWaitingForCompletion)
{
+ try {
+
nodesWaitingForCompletion.wait(timeout);
+ } catch (InterruptedException
e) {
+ // Go back around the
loop
+ }
+ }
+ }
+ continue;
+ } else {
+ Message m;
+ try {
+ m = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ // Which one? I have no idea.
+ // Go around the loop again.
+ continue;
+ }
+ if(m != null) {
+ // Process message
+ PeerNode pn = (PeerNode) m.getSource();
+ boolean processed = false;
+ for(int i=0;i<waiters.length;i++) {
+ PeerNode p = waiters[i].pn;
+ if(p == pn) {
+ boolean anyTimedOut =
m.getBoolean(DMT.ANY_TIMED_OUT);
+
waiters[i].completed(false, !anyTimedOut);
+ if(anyTimedOut) {
+
synchronized(CHKInsertSender.this) {
+
if(!transferTimedOut) {
+
transferTimedOut = true;
+
CHKInsertSender.this.notifyAll();
+ }
+ }
+ }
+ processed = true;
+ break;
+ }
+ }
+ if(!processed) {
+ Logger.error(this, "Did not
process message: "+m+" on "+this);
+ }
+ } else {
+ if(nodesWaitingForCompletion.size() >
waiters.length) {
+ // Added another one
+ Logger.minor(this, "Looping:
waiters="+waiters.length+" but waiting="+nodesWaitingForCompletion.size());
+ continue;
+ }
+ if(noTimeLeft) {
+ Logger.minor(this, "Overall
timeout on "+CHKInsertSender.this);
+ for(int
i=0;i<waiters.length;i++) {
+
if(!waiters[i].pn.isConnected()) continue;
+
if(!waiters[i].receivedCompletionNotice)
+
waiters[i].completed(false, false);
+
if(!waiters[i].completedTransfer)
+
waiters[i].completedTransfer(false);
+ }
+
synchronized(CHKInsertSender.this) {
+ transferTimedOut = true;
+ allTransfersCompleted =
true;
+
CHKInsertSender.this.notifyAll();
+ }
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ /** @return True if all transfers have completed, false
otherwise. */
+ private boolean waitForCompletedTransfers(AwaitingCompletion[]
waiters, int timeout, boolean noTimeLeft) {
+ // MAYBE all done
+ boolean completedTransfers = true;
+ synchronized(nodesWaitingForCompletion) {
+ for(int i=0;i<waiters.length;i++) {
+ if(!waiters[i].pn.isConnected())
continue;
+ if(!waiters[i].completedTransfer) {
+ completedTransfers = false;
+ break;
+ }
+ }
+ if(!completedTransfers) {
+ try {
+ if(!noTimeLeft) {
+
nodesWaitingForCompletion.wait(timeout);
+ } else {
+ // Timed out
+ }
+ completedTransfers = true;
+ for(int
i=0;i<waiters.length;i++) {
+
if(!waiters[i].pn.isConnected()) continue;
+
if(!waiters[i].completedTransfer) {
+
completedTransfers = false;
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ if(completedTransfers) {
+ // All done!
+ Logger.minor(this, "Completed,
status="+getStatusString()+", nothing left to wait for.");
+ synchronized(CHKInsertSender.this) {
+ allTransfersCompleted = true;
+ CHKInsertSender.this.notifyAll();
+ }
+ return true;
+ } else return false;
+ }
+
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
+ }
+
+ public boolean completed() {
+ return allTransfersCompleted;
+ }
+
+ public boolean anyTransfersFailed() {
+ return transferTimedOut;
+ }
+}
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -35,7 +35,7 @@
final long startTime;
private double closestLoc;
private short htl;
- private InsertSender sender;
+ private CHKInsertSender sender;
private byte[] headers;
private BlockReceiver br;
private Thread runThread;
@@ -104,7 +104,7 @@
headers = ((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
// FIXME check the headers
- // Now create an InsertSender, or use an existing one, or
+ // Now create an CHKInsertSender, or use an existing one, or
// discover that the data is in the store.
// From this point onwards, if we return cleanly we must go through
finish().
@@ -134,16 +134,16 @@
// What do we want to wait for?
// If the data receive completes, that's very nice,
// but doesn't really matter. What matters is what
- // happens to the InsertSender. If the data receive
+ // happens to the CHKInsertSender. If the data receive
// fails, that does matter...
- // We are waiting for a terminal status on the InsertSender,
+ // We are waiting for a terminal status on the CHKInsertSender,
// including REPLIED_WITH_DATA.
// If we get transfer failed, we can check whether the receive
// failed first. If it did it's not our fault.
// If the receive failed, and we haven't started transferring
// yet, we probably want to kill the sender.
- // So we call the wait method on the InsertSender, but we
+ // So we call the wait method on the CHKInsertSender, but we
// also have a flag locally to indicate the receive failed.
// And if it does, we interrupt.
@@ -152,7 +152,7 @@
while(true) {
synchronized(sender) {
try {
- if(sender.getStatus() == InsertSender.NOT_FINISHED)
+ if(sender.getStatus() == CHKInsertSender.NOT_FINISHED)
sender.wait(5000);
} catch (InterruptedException e) {
// Cool, probably this is because the receive failed...
@@ -175,7 +175,7 @@
int status = sender.getStatus();
- if(status == InsertSender.NOT_FINISHED) {
+ if(status == CHKInsertSender.NOT_FINISHED) {
continue;
}
@@ -191,20 +191,20 @@
// Local RejectedOverload's (fatal).
// Internal error counts as overload. It'd only create a timeout
otherwise, which is the same thing anyway.
// We *really* need a good way to deal with nodes that constantly
R_O!
- if(status == InsertSender.TIMED_OUT ||
- status == InsertSender.GENERATED_REJECTED_OVERLOAD ||
- status == InsertSender.INTERNAL_ERROR) {
+ if(status == CHKInsertSender.TIMED_OUT ||
+ status == CHKInsertSender.GENERATED_REJECTED_OVERLOAD ||
+ status == CHKInsertSender.INTERNAL_ERROR) {
msg = DMT.createFNPRejectedOverload(uid, true);
source.send(msg);
// Might as well store it anyway.
- if(status == InsertSender.TIMED_OUT ||
- status ==
InsertSender.GENERATED_REJECTED_OVERLOAD)
+ if(status == CHKInsertSender.TIMED_OUT ||
+ status ==
CHKInsertSender.GENERATED_REJECTED_OVERLOAD)
canCommit = true;
finish();
return;
}
- if(status == InsertSender.ROUTE_NOT_FOUND || status ==
InsertSender.ROUTE_REALLY_NOT_FOUND) {
+ if(status == CHKInsertSender.ROUTE_NOT_FOUND || status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
source.send(msg);
canCommit = true;
@@ -212,7 +212,7 @@
return;
}
- if(status == InsertSender.SUCCESS) {
+ if(status == CHKInsertSender.SUCCESS) {
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
source.send(msg);
Deleted: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/node/InsertSender.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -1,778 +0,0 @@
-package freenet.node;
-
-import java.util.HashSet;
-import java.util.Vector;
-
-import freenet.io.comm.DMT;
-import freenet.io.comm.DisconnectedException;
-import freenet.io.comm.Message;
-import freenet.io.comm.MessageFilter;
-import freenet.io.xfer.BlockTransmitter;
-import freenet.io.xfer.PartiallyReceivedBlock;
-import freenet.keys.CHKBlock;
-import freenet.keys.CHKVerifyException;
-import freenet.keys.NodeCHK;
-import freenet.support.Logger;
-
-public final class InsertSender implements Runnable {
-
- private class AwaitingCompletion {
-
- /** Node we are waiting for response from */
- final PeerNode pn;
- /** We may be sending data to that node */
- BlockTransmitter bt;
- /** Have we received notice of the downstream success
- * or failure of dependant transfers from that node?
- * Includes timing out. */
- boolean receivedCompletionNotice = false;
- /** Timed out - didn't receive completion notice in
- * the allotted time?? */
- boolean completionTimedOut = false;
- /** Was the notification of successful transfer? */
- boolean completionSucceeded;
-
- /** Have we completed the immediate transfer? */
- boolean completedTransfer = false;
- /** Did it succeed? */
- boolean transferSucceeded = false;
-
- AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
- this.pn = pn;
- bt = new BlockTransmitter(node.usm, pn, uid, prb);
- Sender s = new Sender(this);
- Thread senderThread = new Thread(s, "Sender for "+uid+" to
"+pn.getPeer());
- senderThread.setDaemon(true);
- senderThread.start();
- }
-
- void completed(boolean timeout, boolean success) {
- synchronized(this) {
- if(timeout)
- completionTimedOut = true;
- else
- completionSucceeded = success;
- receivedCompletionNotice = true;
- notifyAll();
- }
- synchronized(nodesWaitingForCompletion) {
- nodesWaitingForCompletion.notifyAll();
- }
- if(!success) {
- synchronized(InsertSender.this) {
- transferTimedOut = true;
- InsertSender.this.notifyAll();
- }
- }
- }
-
- void completedTransfer(boolean success) {
- synchronized(this) {
- transferSucceeded = success;
- completedTransfer = true;
- notifyAll();
- }
- synchronized(nodesWaitingForCompletion) {
- nodesWaitingForCompletion.notifyAll();
- }
- if(!success) {
- synchronized(InsertSender.this) {
- transferTimedOut = true;
- InsertSender.this.notifyAll();
- }
- }
- }
- }
-
- public class Sender implements Runnable {
-
- final AwaitingCompletion completion;
- final BlockTransmitter bt;
-
- public Sender(AwaitingCompletion ac) {
- this.bt = ac.bt;
- this.completion = ac;
- }
-
- public void run() {
- try {
- bt.send();
- if(bt.failedDueToOverload()) {
- completion.completedTransfer(false);
- } else {
- completion.completedTransfer(true);
- }
- } catch (Throwable t) {
- completion.completedTransfer(false);
- Logger.error(this, "Caught "+t, t);
- }
- }
- }
-
- InsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
- PeerNode source, Node node, PartiallyReceivedBlock prb, boolean
fromStore, double closestLocation) {
- this.myKey = myKey;
- this.target = myKey.toNormalizedDouble();
- this.uid = uid;
- this.headers = headers;
- this.htl = htl;
- this.source = source;
- this.node = node;
- this.prb = prb;
- this.fromStore = fromStore;
- this.closestLocation = closestLocation;
- this.startTime = System.currentTimeMillis();
- this.nodesWaitingForCompletion = new Vector();
- Thread t = new Thread(this, "InsertSender for UID "+uid+" on
"+node.portNumber+" at "+System.currentTimeMillis());
- t.setDaemon(true);
- t.start();
- }
-
- // Constants
- static final int ACCEPTED_TIMEOUT = 10000;
- static final int SEARCH_TIMEOUT = 60000;
- static final int TRANSFER_COMPLETION_TIMEOUT = 120000;
-
- // Basics
- final NodeCHK myKey;
- final double target;
- final long uid;
- short htl;
- final PeerNode source;
- final Node node;
- final byte[] headers; // received BEFORE creation => we handle Accepted
elsewhere
- final PartiallyReceivedBlock prb;
- final boolean fromStore;
- private boolean receiveFailed = false;
- final double closestLocation;
- final long startTime;
- private boolean sentRequest;
-
- /** List of nodes we are waiting for either a transfer completion
- * notice or a transfer completion from. */
- private Vector nodesWaitingForCompletion;
-
- /** Have all transfers completed and all nodes reported completion status?
*/
- private boolean allTransfersCompleted = false;
-
- /** Has a transfer timed out, either directly or downstream? */
- private boolean transferTimedOut = false;
-
- /** Runnable which waits for completion of all transfers */
- private CompletionWaiter cw = null;
-
- /** Time at which we set status to a value other than NOT_FINISHED */
- private long setStatusTime = -1;
-
-
- private int status = -1;
- /** Still running */
- static final int NOT_FINISHED = -1;
- /** Successful insert */
- static final int SUCCESS = 0;
- /** Route not found */
- static final int ROUTE_NOT_FOUND = 1;
- /** Internal error */
- static final int INTERNAL_ERROR = 3;
- /** Timed out waiting for response */
- static final int TIMED_OUT = 4;
- /** Locally Generated a RejectedOverload */
- static final int GENERATED_REJECTED_OVERLOAD = 5;
- /** Could not get off the node at all! */
- static final int ROUTE_REALLY_NOT_FOUND = 6;
-
- public String toString() {
- return super.toString()+" for "+uid;
- }
-
- public void run() {
- short origHTL = htl;
- try {
- HashSet nodesRoutedTo = new HashSet();
- HashSet nodesNotIgnored = new HashSet();
-
- while(true) {
- if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
-
- if(htl == 0) {
- // Send an InsertReply back
- finish(SUCCESS, null);
- return;
- }
-
- // Route it
- PeerNode next;
- // Can backtrack, so only route to nodes closer than we are to
target.
- double nextValue;
- synchronized(node.peers) {
- next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true);
- if(next != null)
- nextValue = next.getLocation().getValue();
- else
- nextValue = -1.0;
- }
-
- if(next == null) {
- // Backtrack
- finish(ROUTE_NOT_FOUND, null);
- return;
- }
- Logger.minor(this, "Routing insert to "+next);
- nodesRoutedTo.add(next);
-
- if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
- Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
- htl = node.decrementHTL(source, htl);
- }
-
- Message req = DMT.createFNPInsertRequest(uid, htl, myKey,
closestLocation);
-
- // Wait for ack or reject... will come before even a locally
generated DataReply
-
- MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
- MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
- MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
-
- // mfRejectedOverload must be the last thing in the or
- // So its or pointer remains null
- // Otherwise we need to recreate it below
- mfRejectedOverload.clearOr();
- MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
-
- // Send to next node
-
- next.send(req);
- sentRequest = true;
-
- if(receiveFailed) return; // don't need to set status as killed by
InsertHandler
- Message msg = null;
-
- /*
- * Because messages may be re-ordered, it is
- * entirely possible that we get a non-local RejectedOverload,
- * followed by an Accepted. So we must loop here.
- */
-
- while (true) {
-
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected from
" + next
- + " while waiting for
Accepted");
- break;
- }
-
- if (receiveFailed)
- return; // don't need to set status as
killed by InsertHandler
-
- if (msg == null) {
- // Terminal overload
- // Try to propagate back to source
- Logger.minor(this, "Timeout");
- next.localRejectedOverload();
- finish(TIMED_OUT, next);
- return;
- }
-
- if (msg.getSpec() == DMT.FNPRejectedOverload) {
- // Non-fatal - probably still have time
left
- if (msg.getBoolean(DMT.IS_LOCAL)) {
- next.localRejectedOverload();
- Logger.minor(this,
-
"Local RejectedOverload, moving on to next peer");
- // Give up on this one, try
another
- break;
- } else {
- forwardRejectedOverload();
- }
- continue;
- }
-
- if (msg.getSpec() == DMT.FNPRejectedLoop) {
- next.successNotOverload();
- // Loop - we don't want to send the
data to this one
- break;
- }
-
- if (msg.getSpec() != DMT.FNPAccepted) {
- Logger.error(this,
- "Unexpected message
waiting for Accepted: "
- + msg);
- break;
- }
- // Otherwise is an FNPAccepted
- break;
- }
-
- if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;
-
- Logger.minor(this, "Got Accepted on "+this);
-
- // Send them the data.
- // Which might be the new data resulting from a collision...
-
- Message dataInsert;
- PartiallyReceivedBlock prbNow;
- prbNow = prb;
- dataInsert = DMT.createFNPDataInsert(uid, headers);
- /** What are we waiting for now??:
- * - FNPRouteNotFound - couldn't exhaust HTL, but send us the
- * data anyway please
- * - FNPInsertReply - used up all HTL, yay
- * - FNPRejectOverload - propagating an overload error :(
- * - FNPDataFound - target already has the data, and the data is
- * an SVK/SSK/KSK, therefore could be different to what we are
- * inserting.
- */
-
- MessageFilter mfInsertReply =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPInsertReply);
- mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
- mfRejectedOverload.clearOr();
- MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
- MessageFilter mfDataInsertRejected =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPDataInsertRejected);
- MessageFilter mfTimeout =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRejectedTimeout);
-
- mf =
mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload))));
-
- Logger.minor(this, "Sending DataInsert");
- if(receiveFailed) return;
- next.send(dataInsert);
-
- Logger.minor(this, "Sending data");
- if(receiveFailed) return;
- AwaitingCompletion ac = new AwaitingCompletion(next, prbNow);
- synchronized(nodesWaitingForCompletion) {
- nodesWaitingForCompletion.add(ac);
- nodesWaitingForCompletion.notifyAll();
- }
- makeCompletionWaiter();
-
- while (true) {
-
- if (receiveFailed)
- return;
-
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- Logger.normal(this, "Disconnected from
" + next
- + " while waiting for
InsertReply on " + this);
- break;
- }
- if (receiveFailed)
- return;
-
- if (msg == null || msg.getSpec() ==
DMT.FNPRejectedTimeout) {
- // Timeout :(
- // Fairly serious problem
- Logger.error(this, "Timeout (" + msg
- + ") after Accepted in
insert");
- // Terminal overload
- // Try to propagate back to source
- next.localRejectedOverload();
- finish(TIMED_OUT, next);
- return;
- }
-
- if (msg.getSpec() == DMT.FNPRejectedOverload) {
- // Probably non-fatal, if so, we have
time left, can try next one
- if (msg.getBoolean(DMT.IS_LOCAL)) {
- next.localRejectedOverload();
- Logger.minor(this,
- "Local
RejectedOverload, moving on to next peer");
- // Give up on this one, try
another
- break;
- } else {
- forwardRejectedOverload();
- }
- continue; // Wait for any further
response
- }
-
- if (msg.getSpec() == DMT.FNPRouteNotFound) {
- Logger.minor(this, "Rejected: RNF");
- short newHtl = msg.getShort(DMT.HTL);
- if (htl > newHtl)
- htl = newHtl;
- // Finished as far as this node is
concerned
- next.successNotOverload();
- break;
- }
-
- if (msg.getSpec() == DMT.FNPDataInsertRejected)
{
- next.successNotOverload();
- short reason = msg
-
.getShort(DMT.DATA_INSERT_REJECTED_REASON);
- Logger.minor(this, "DataInsertRejected:
" + reason);
- if (reason ==
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
- if (fromStore) {
- // That's odd...
-
Logger.error(this,"Verify failed on next node "
- + next
+ " for DataInsert but we were sending from the store!");
- } else {
- try {
- if
(!prb.allReceived())
-
Logger.error(this,
-
"Did not receive all packets but next node says invalid anyway!");
- else {
- //
Check the data
- new
CHKBlock(prb.getBlock(), headers,
-
myKey);
-
Logger.error(this,
-
"Verify failed on " + next
-
+ " but data was valid!");
- }
- } catch
(CHKVerifyException e) {
- Logger
-
.normal(this,
-
"Verify failed because data was invalid");
- }
- }
- break; // What else can we do?
- } else if (reason ==
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
- if (receiveFailed) {
- Logger.minor(this,
"Failed to receive data, so failed to send data");
- } else {
- if (prb.allReceived()) {
-
Logger.error(this, "Received all data but send failed to " + next);
- } else {
- if
(prb.isAborted()) {
-
Logger.normal(this, "Send failed: aborted: " + prb.getAbortReason() + ": " +
prb.getAbortDescription());
- } else
-
Logger.normal(this, "Send failed; have not yet received all data but not
aborted: " + next);
- }
- }
- break;
- }
- Logger.error(this, "DataInsert
rejected! Reason="
- +
DMT.getDataInsertRejectedReason(reason));
- }
-
- if (msg.getSpec() != DMT.FNPInsertReply) {
- Logger.error(this, "Unknown reply: " +
msg);
- finish(INTERNAL_ERROR, next);
- }
-
- // Our task is complete
- next.successNotOverload();
- finish(SUCCESS, next);
- return;
- }
- }
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- if(status == NOT_FINISHED)
- finish(INTERNAL_ERROR, null);
- } finally {
- node.completed(uid);
- node.removeInsertSender(myKey, origHTL, this);
- }
- }
-
- private boolean hasForwardedRejectedOverload = false;
-
- synchronized boolean receivedRejectedOverload() {
- return hasForwardedRejectedOverload;
- }
-
- /** Forward RejectedOverload to the request originator.
- * DO NOT CALL if have a *local* RejectedOverload.
- */
- private synchronized void forwardRejectedOverload() {
- if(hasForwardedRejectedOverload) return;
- hasForwardedRejectedOverload = true;
- notifyAll();
- }
-
- private void finish(int code, PeerNode next) {
- Logger.minor(this, "Finished: "+code+" on "+this, new
Exception("debug"));
- if(status != NOT_FINISHED)
- throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
-
- setStatusTime = System.currentTimeMillis();
-
- if(code == ROUTE_NOT_FOUND && !sentRequest)
- code = ROUTE_REALLY_NOT_FOUND;
-
- status = code;
-
- synchronized(this) {
- notifyAll();
- }
-
- Logger.minor(this, "Set status code: "+getStatusString());
-
- // Now wait for transfers, or for downstream transfer notifications.
-
- synchronized(this) {
- if(cw != null) {
- while(!allTransfersCompleted) {
- try {
- wait(10*1000);
- } catch (InterruptedException e) {
- // Try again
- }
- }
- } else {
- // There weren't any transfers
- allTransfersCompleted = true;
- }
- notifyAll();
- }
-
- Logger.minor(this, "Returning from finish()");
- }
-
- public int getStatus() {
- return status;
- }
-
- public short getHTL() {
- return htl;
- }
-
- /**
- * Called by InsertHandler to notify that the receive has
- * failed.
- */
- public void receiveFailed() {
- receiveFailed = true;
- }
-
- /**
- * @return The current status as a string
- */
- public String getStatusString() {
- if(status == SUCCESS)
- return "SUCCESS";
- if(status == ROUTE_NOT_FOUND)
- return "ROUTE NOT FOUND";
- if(status == NOT_FINISHED)
- return "NOT FINISHED";
- if(status == INTERNAL_ERROR)
- return "INTERNAL ERROR";
- if(status == TIMED_OUT)
- return "TIMED OUT";
- if(status == GENERATED_REJECTED_OVERLOAD)
- return "GENERATED REJECTED OVERLOAD";
- if(status == ROUTE_REALLY_NOT_FOUND)
- return "ROUTE REALLY NOT FOUND";
- return "UNKNOWN STATUS CODE: "+status;
- }
-
- public boolean sentRequest() {
- return sentRequest;
- }
-
- private synchronized void makeCompletionWaiter() {
- if(cw == null) {
- cw = new CompletionWaiter();
- Thread t = new Thread(cw, "Completion waiter for "+uid);
- t.setDaemon(true);
- t.start();
- }
- }
-
- private class CompletionWaiter implements Runnable {
-
- public void run() {
- Logger.minor(this, "Starting "+this);
-outer: while(true) {
- AwaitingCompletion[] waiters;
- synchronized(nodesWaitingForCompletion) {
- waiters = new
AwaitingCompletion[nodesWaitingForCompletion.size()];
- waiters = (AwaitingCompletion[])
nodesWaitingForCompletion.toArray(waiters);
- }
-
- // First calculate the timeout
-
- int timeout;
- boolean noTimeLeft = false;
-
- long now = System.currentTimeMillis();
- if(status == NOT_FINISHED) {
- // Wait 5 seconds, then try again
- timeout = 5000;
- } else {
- // Completed, wait for everything
- timeout = (int)Math.min(Integer.MAX_VALUE,
(setStatusTime + TRANSFER_COMPLETION_TIMEOUT) - now);
- }
- if(timeout <= 0) {
- noTimeLeft = true;
- timeout = 1;
- }
-
- MessageFilter mf = null;
- for(int i=0;i<waiters.length;i++) {
- AwaitingCompletion awc = waiters[i];
- if(!awc.pn.isConnected()) {
- Logger.normal(this, "Disconnected:
"+awc.pn+" in "+InsertSender.this);
- continue;
- }
- if(!awc.receivedCompletionNotice) {
- MessageFilter m =
-
MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
- if(mf == null)
- mf = m;
- else
- mf = m.or(mf);
- Logger.minor(this, "Waiting for
"+awc.pn.getPeer());
- }
- }
-
- if(mf == null) {
- if(status != NOT_FINISHED) {
- if(nodesWaitingForCompletion.size() !=
waiters.length) {
- // Added another one
- Logger.minor(this, "Looping
(mf==null): waiters="+waiters.length+" but
waiting="+nodesWaitingForCompletion.size());
- continue;
- }
- if(waitForCompletedTransfers(waiters,
timeout, noTimeLeft)) {
- synchronized(InsertSender.this)
{
- allTransfersCompleted =
true;
-
InsertSender.this.notifyAll();
- }
- return;
- }
- if(noTimeLeft) {
- for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isConnected()) continue;
-
if(!waiters[i].completedTransfer) {
-
waiters[i].completedTransfer(false);
- }
- }
- synchronized(InsertSender.this)
{
- allTransfersCompleted =
true;
-
InsertSender.this.notifyAll();
- }
- return;
- }
- // Otherwise, not finished, go back
around loop
- continue;
- } else {
- // Still waiting for request
completion, so more may be added
- synchronized(nodesWaitingForCompletion)
{
- try {
-
nodesWaitingForCompletion.wait(timeout);
- } catch (InterruptedException
e) {
- // Go back around the
loop
- }
- }
- }
- continue;
- } else {
- Message m;
- try {
- m = node.usm.waitFor(mf);
- } catch (DisconnectedException e) {
- // Which one? I have no idea.
- // Go around the loop again.
- continue;
- }
- if(m != null) {
- // Process message
- PeerNode pn = (PeerNode) m.getSource();
- boolean processed = false;
- for(int i=0;i<waiters.length;i++) {
- PeerNode p = waiters[i].pn;
- if(p == pn) {
- boolean anyTimedOut =
m.getBoolean(DMT.ANY_TIMED_OUT);
-
waiters[i].completed(false, !anyTimedOut);
- if(anyTimedOut) {
-
synchronized(InsertSender.this) {
-
if(!transferTimedOut) {
-
transferTimedOut = true;
-
InsertSender.this.notifyAll();
- }
- }
- }
- processed = true;
- break;
- }
- }
- if(!processed) {
- Logger.error(this, "Did not
process message: "+m+" on "+this);
- }
- } else {
- if(nodesWaitingForCompletion.size() >
waiters.length) {
- // Added another one
- Logger.minor(this, "Looping:
waiters="+waiters.length+" but waiting="+nodesWaitingForCompletion.size());
- continue;
- }
- if(noTimeLeft) {
- Logger.minor(this, "Overall
timeout on "+InsertSender.this);
- for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isConnected()) continue;
-
if(!waiters[i].receivedCompletionNotice)
-
waiters[i].completed(false, false);
-
if(!waiters[i].completedTransfer)
-
waiters[i].completedTransfer(false);
- }
- synchronized(InsertSender.this)
{
- transferTimedOut = true;
- allTransfersCompleted =
true;
-
InsertSender.this.notifyAll();
- }
- return;
- }
- }
- }
- }
- }
-
- /** @return True if all transfers have completed, false
otherwise. */
- private boolean waitForCompletedTransfers(AwaitingCompletion[]
waiters, int timeout, boolean noTimeLeft) {
- // MAYBE all done
- boolean completedTransfers = true;
- synchronized(nodesWaitingForCompletion) {
- for(int i=0;i<waiters.length;i++) {
- if(!waiters[i].pn.isConnected())
continue;
- if(!waiters[i].completedTransfer) {
- completedTransfers = false;
- break;
- }
- }
- if(!completedTransfers) {
- try {
- if(!noTimeLeft) {
-
nodesWaitingForCompletion.wait(timeout);
- } else {
- // Timed out
- }
- completedTransfers = true;
- for(int
i=0;i<waiters.length;i++) {
-
if(!waiters[i].pn.isConnected()) continue;
-
if(!waiters[i].completedTransfer) {
-
completedTransfers = false;
- break;
- }
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
- if(completedTransfers) {
- // All done!
- Logger.minor(this, "Completed,
status="+getStatusString()+", nothing left to wait for.");
- synchronized(InsertSender.this) {
- allTransfersCompleted = true;
- InsertSender.this.notifyAll();
- }
- return true;
- } else return false;
- }
-
- public String toString() {
- return super.toString()+" for "+uid;
- }
- }
-
- public boolean completed() {
- return allTransfersCompleted;
- }
-
- public boolean anyTransfersFailed() {
- return transferTimedOut;
- }
-}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-01-06 19:26:42 UTC (rev
7769)
+++ trunk/freenet/src/freenet/node/Node.java 2006-01-06 21:18:26 UTC (rev
7770)
@@ -47,6 +47,7 @@
import freenet.keys.ClientCHKBlock;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
import freenet.keys.NodeCHK;
import freenet.keys.SSKBlock;
import freenet.store.BerkeleyDBFreenetStore;
@@ -128,7 +129,7 @@
private final HashMap requestSenders;
/** RequestSender's currently transferring, by key */
private final HashMap transferringRequestSenders;
- /** InsertSender's currently running, by KeyHTLPair */
+ /** CHKInsertSender's currently running, by KeyHTLPair */
private final HashMap insertSenders;
/** IP address detector */
private final IPAddressDetector ipDetector;
@@ -555,9 +556,9 @@
public void realPutCHK(ClientCHKBlock block, boolean cache) throws
LowLevelPutException {
byte[] data = block.getData();
- byte[] headers = block.getHeader();
+ byte[] headers = block.getHeaders();
PartiallyReceivedBlock prb = new
PartiallyReceivedBlock(PACKETS_IN_BLOCK, PACKET_SIZE, data);
- InsertSender is;
+ CHKInsertSender is;
long uid = random.nextLong();
if(!lockUID(uid)) {
Logger.error(this, "Could not lock UID just randomly generated:
"+uid+" - probably indicates broken PRNG");
@@ -579,14 +580,14 @@
// Wait for status
while(true) {
synchronized(is) {
- if(is.getStatus() == InsertSender.NOT_FINISHED) {
+ if(is.getStatus() == CHKInsertSender.NOT_FINISHED) {
try {
is.wait(5*1000);
} catch (InterruptedException e) {
// Ignore
}
}
- if(is.getStatus() != InsertSender.NOT_FINISHED) break;
+ if(is.getStatus() != CHKInsertSender.NOT_FINISHED)
break;
}
if((!hasForwardedRejectedOverload) &&
is.receivedRejectedOverload()) {
hasForwardedRejectedOverload = true;
@@ -615,8 +616,8 @@
// Finished?
if(!hasForwardedRejectedOverload) {
// Is it ours? Did we send a request?
- if(is.sentRequest() && is.uid == uid && (is.getStatus() ==
InsertSender.ROUTE_NOT_FOUND
- || is.getStatus() == InsertSender.SUCCESS)) {
+ if(is.sentRequest() && is.uid == uid && (is.getStatus() ==
CHKInsertSender.ROUTE_NOT_FOUND
+ || is.getStatus() == CHKInsertSender.SUCCESS)) {
// It worked!
long endTime = System.currentTimeMillis();
long len = endTime - startTime;
@@ -624,33 +625,33 @@
}
}
- if(is.getStatus() == InsertSender.SUCCESS) {
+ if(is.getStatus() == CHKInsertSender.SUCCESS) {
Logger.normal(this, "Succeeded inserting "+block);
return;
} else {
int status = is.getStatus();
String msg = "Failed inserting "+block+" :
"+is.getStatusString();
- if(status == InsertSender.ROUTE_NOT_FOUND)
+ if(status == CHKInsertSender.ROUTE_NOT_FOUND)
msg += " - this is normal on small networks; the data
will still be propagated, but it can't find the 20+ nodes needed for full
success";
- if(is.getStatus() != InsertSender.ROUTE_NOT_FOUND)
+ if(is.getStatus() != CHKInsertSender.ROUTE_NOT_FOUND)
Logger.error(this, msg);
else
Logger.normal(this, msg);
switch(is.getStatus()) {
- case InsertSender.NOT_FINISHED:
+ case CHKInsertSender.NOT_FINISHED:
Logger.error(this, "IS still running in putCHK!: "+is);
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
- case InsertSender.GENERATED_REJECTED_OVERLOAD:
- case InsertSender.TIMED_OUT:
+ case CHKInsertSender.GENERATED_REJECTED_OVERLOAD:
+ case CHKInsertSender.TIMED_OUT:
throw new
LowLevelPutException(LowLevelPutException.REJECTED_OVERLOAD);
- case InsertSender.ROUTE_NOT_FOUND:
+ case CHKInsertSender.ROUTE_NOT_FOUND:
throw new
LowLevelPutException(LowLevelPutException.ROUTE_NOT_FOUND);
- case InsertSender.ROUTE_REALLY_NOT_FOUND:
+ case CHKInsertSender.ROUTE_REALLY_NOT_FOUND:
throw new
LowLevelPutException(LowLevelPutException.ROUTE_REALLY_NOT_FOUND);
- case InsertSender.INTERNAL_ERROR:
+ case CHKInsertSender.INTERNAL_ERROR:
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
default:
- Logger.error(this, "Unknown InsertSender code in
putCHK: "+is.getStatus()+" on "+is);
+ Logger.error(this, "Unknown CHKInsertSender code in
putCHK: "+is.getStatus()+" on "+is);
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
}
@@ -793,9 +794,9 @@
}
static class KeyHTLPair {
- final NodeCHK key;
+ final Key key;
final short htl;
- KeyHTLPair(NodeCHK key, short htl) {
+ KeyHTLPair(Key key, short htl) {
this.key = key;
this.htl = htl;
}
@@ -864,11 +865,11 @@
}
/**
- * Remove an InsertSender from the map.
+ * Remove an CHKInsertSender from the map.
*/
- public void removeInsertSender(NodeCHK key, short htl, InsertSender
sender) {
+ public void removeInsertSender(Key key, short htl, AnyInsertSender sender)
{
KeyHTLPair kh = new KeyHTLPair(key, htl);
- InsertSender is = (InsertSender) insertSenders.remove(kh);
+ AnyInsertSender is = (AnyInsertSender) insertSenders.remove(kh);
if(is != sender) {
Logger.error(this, "Removed "+is+" should be "+sender+" for
"+key+","+htl+" in removeInsertSender");
}
@@ -900,28 +901,28 @@
}
/**
- * Fetch or create an InsertSender for a given key/htl.
+ * Fetch or create an CHKInsertSender for a given key/htl.
* @param key The key to be inserted.
* @param htl The current HTL. We can't coalesce inserts across
* HTL's.
* @param uid The UID of the caller's request chain, or a new
* one. This is obviously not used if there is already an
- * InsertSender running.
+ * CHKInsertSender running.
* @param source The node that sent the InsertRequest, or null
* if it originated locally.
*/
- public synchronized InsertSender makeInsertSender(NodeCHK key, short htl,
long uid, PeerNode source,
+ public synchronized CHKInsertSender makeInsertSender(NodeCHK key, short
htl, long uid, PeerNode source,
byte[] headers, PartiallyReceivedBlock prb, boolean fromStore,
double closestLoc, boolean cache) {
Logger.minor(this,
"makeInsertSender("+key+","+htl+","+uid+","+source+",...,"+fromStore);
KeyHTLPair kh = new KeyHTLPair(key, htl);
- InsertSender is = (InsertSender) insertSenders.get(kh);
+ CHKInsertSender is = (CHKInsertSender) insertSenders.get(kh);
if(is != null) {
Logger.minor(this, "Found "+is+" for "+kh);
return is;
}
if(fromStore && !cache)
throw new IllegalArgumentException("From store = true but cache
= false !!!");
- is = new InsertSender(key, uid, headers, htl, source, this, prb,
fromStore, closestLoc);
+ is = new CHKInsertSender(key, uid, headers, htl, source, this, prb,
fromStore, closestLoc);
Logger.minor(this, is.toString()+" for "+kh.toString());
insertSenders.put(kh, is);
return is;
@@ -962,7 +963,7 @@
// Dump
Iterator i = insertSenders.values().iterator();
while(i.hasNext()) {
- InsertSender s = (InsertSender) i.next();
+ CHKInsertSender s = (CHKInsertSender) i.next();
sb.append(s.uid);
sb.append(": ");
sb.append(s.getStatusString());
Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2006-01-06 19:26:42 UTC (rev 7769)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
2006-01-06 21:18:26 UTC (rev 7770)
@@ -177,11 +177,11 @@
block = ClientCHKBlock.encode(data, false, false, (short)-1,
0);
ClientCHK chk = block.getClientKey();
byte[] encData = block.getData();
- byte[] encHeaders = block.getHeader();
+ byte[] encHeaders = block.getHeaders();
ClientCHKBlock newBlock = new ClientCHKBlock(encData,
encHeaders, chk, true);
Logger.error(RealNodeRequestInsertTest.class, "Decoded: "+new
String(newBlock.memoryDecode()));
Logger.error(RealNodeRequestInsertTest.class,"CHK:
"+chk.getURI());
- Logger.error(RealNodeRequestInsertTest.class,"Headers:
"+HexUtil.bytesToHex(block.getHeader()));
+ Logger.error(RealNodeRequestInsertTest.class,"Headers:
"+HexUtil.bytesToHex(block.getHeaders()));
randomNode.putCHK(block, starters[node1], true);
Logger.error(RealNodeRequestInsertTest.class, "Inserted to
"+node1);
Logger.error(RealNodeRequestInsertTest.class, "Data:
"+Fields.hashCode(encData)+", Headers: "+Fields.hashCode(encHeaders));
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2006-01-06 19:26:42 UTC
(rev 7769)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2006-01-06 21:18:26 UTC
(rev 7770)
@@ -53,7 +53,7 @@
Object o = node.makeRequestSender(key, htl, uid, source, closestLoc,
false, true);
if(o instanceof CHKBlock) {
CHKBlock block = (CHKBlock) o;
- Message df = DMT.createFNPDataFound(uid, block.getHeader());
+ Message df = DMT.createFNPDataFound(uid, block.getHeaders());
source.send(df);
PartiallyReceivedBlock prb =
new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE, block.getData());
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-01-06 19:26:42 UTC (rev
7769)
+++ trunk/freenet/src/freenet/node/Version.java 2006-01-06 21:18:26 UTC (rev
7770)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 323;
+ public static final int buildNumber = 324;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 318;
Modified: trunk/freenet/src/freenet/store/BaseFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BaseFreenetStore.java 2006-01-06
19:26:42 UTC (rev 7769)
+++ trunk/freenet/src/freenet/store/BaseFreenetStore.java 2006-01-06
21:18:26 UTC (rev 7770)
@@ -95,7 +95,7 @@
*/
public synchronized void put(CHKBlock block) throws IOException {
byte[] data = block.getData();
- byte[] headers = block.getHeader();
+ byte[] headers = block.getHeaders();
int hlen = headers.length;
if(data.length != DATA_BLOCK_SIZE || hlen > HEADER_BLOCK_SIZE-2)
throw new IllegalArgumentException("Too big - data:
"+data.length+" should be "+
Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-06
19:26:42 UTC (rev 7769)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-06
21:18:26 UTC (rev 7770)
@@ -212,7 +212,7 @@
byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
byte[] data = block.getData();
- byte[] header = block.getHeader();
+ byte[] header = block.getHeaders();
if(data.length!=dataBlockSize) {
Logger.minor(this, "This data is "+data.length+" bytes. Should
be "+dataBlockSize);