Author: toad
Date: 2006-01-06 22:41:05 +0000 (Fri, 06 Jan 2006)
New Revision: 7772
Added:
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/node/SSKInsertSender.java
trunk/freenet/src/freenet/support/ImmutableByteArrayWrapper.java
Modified:
trunk/freenet/src/freenet/crypt/DSAPublicKey.java
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/keys/ClientSSKBlock.java
trunk/freenet/src/freenet/keys/Key.java
trunk/freenet/src/freenet/keys/SSKBlock.java
trunk/freenet/src/freenet/node/CHKInsertSender.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/store/BaseFreenetStore.java
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
trunk/freenet/src/freenet/store/FreenetStore.java
Log:
326:
More SSKs (InsertHandler).
Also commit files missed last time.
Modified: trunk/freenet/src/freenet/crypt/DSAPublicKey.java
===================================================================
--- trunk/freenet/src/freenet/crypt/DSAPublicKey.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/crypt/DSAPublicKey.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -38,7 +38,17 @@
this(g,g.getG().modPow(p.getX(), g.getP()));
}
- public BigInteger getY() {
+ public DSAPublicKey(InputStream is) throws IOException {
+ group=(DSAGroup) DSAGroup.read(is);
+ y=Util.readMPI(is);
+ this.yAsHexString = HexUtil.biToHex(y);
+ }
+
+ public DSAPublicKey(byte[] pubkeyAsBytes) throws IOException {
+ this(new ByteArrayInputStream(pubkeyAsBytes));
+ }
+
+ public BigInteger getY() {
return y;
}
@@ -95,9 +105,7 @@
// }
//
public static CryptoKey read(InputStream i) throws IOException {
- BigInteger y=Util.readMPI(i);
- DSAGroup g=(DSAGroup)CryptoKey.read(i);
- return new DSAPublicKey(g, y);
+ return new DSAPublicKey(i);
}
public int keyId() {
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2006-01-06 22:12:27 UTC (rev
7771)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2006-01-06 22:41:05 UTC (rev
7772)
@@ -662,6 +662,7 @@
public static final short DATA_INSERT_REJECTED_VERIFY_FAILED = 1;
public static final short DATA_INSERT_REJECTED_RECEIVE_FAILED = 2;
+ public static final short DATA_INSERT_REJECTED_SSK_ERROR = 3;
public static final String getDataInsertRejectedReason(short reason) {
if(reason == DATA_INSERT_REJECTED_VERIFY_FAILED)
@@ -674,45 +675,43 @@
public static final MessageType FNPSSKInsertRequest = new
MessageType("FNPSSKInsertRequest") {{
addField(UID, Long.class);
addField(HTL, Short.class);
- addField(KEY, NodeSSK.class);
+ addField(FREENET_ROUTING_KEY, NodeSSK.class);
addField(NEAREST_LOCATION, Double.class);
addField(BLOCK_HEADERS, ShortBuffer.class);
addField(PUBKEY_HASH, ShortBuffer.class);
+ addField(DATA, ShortBuffer.class);
}};
- public static Message createFNPSSKInsertRequest(long uid, short htl,
NodeSSK myKey, double closestLocation, byte[] headers, byte[] pubKeyHash) {
+ public static Message createFNPSSKInsertRequest(long uid, short htl,
NodeSSK myKey, double closestLocation, byte[] headers, byte[] data, byte[]
pubKeyHash) {
Message msg = new Message(FNPSSKInsertRequest);
msg.set(UID, uid);
msg.set(HTL, htl);
- msg.set(KEY, myKey);
+ msg.set(FREENET_ROUTING_KEY, myKey);
msg.set(NEAREST_LOCATION, closestLocation);
msg.set(BLOCK_HEADERS, new ShortBuffer(headers));
msg.set(PUBKEY_HASH, new ShortBuffer(pubKeyHash));
+ msg.set(DATA, new ShortBuffer(data));
return msg;
}
public static final MessageType FNPSSKDataFound = new
MessageType("FNPSSKDataFound") {{
addField(UID, Long.class);
- addField(HTL, Short.class);
- addField(KEY, NodeSSK.class);
- addField(NEAREST_LOCATION, Double.class);
+ addField(FREENET_ROUTING_KEY, NodeSSK.class);
addField(BLOCK_HEADERS, ShortBuffer.class);
addField(PUBKEY_HASH, ShortBuffer.class);
+ addField(DATA, ShortBuffer.class);
}};
- public static Message createFNPSSKDataFound(long uid, short htl,
NodeSSK myKey, double closestLocation, byte[] headers, byte[] pubKeyHash) {
+ public static Message createFNPSSKDataFound(long uid, NodeSSK myKey,
byte[] headers, byte[] data, byte[] pubKeyHash) {
Message msg = new Message(FNPSSKDataFound);
msg.set(UID, uid);
- msg.set(HTL, htl);
- msg.set(KEY, myKey);
- msg.set(NEAREST_LOCATION, closestLocation);
+ msg.set(FREENET_ROUTING_KEY, myKey);
msg.set(BLOCK_HEADERS, new ShortBuffer(headers));
msg.set(PUBKEY_HASH, new ShortBuffer(pubKeyHash));
+ msg.set(DATA, new ShortBuffer(data));
return msg;
}
-
-
public static MessageType FNPSSKAccepted = new
MessageType("FNPSSKAccepted") {{
addField(UID, Long.class);
addField(NEED_PUB_KEY, Boolean.class);
Modified: trunk/freenet/src/freenet/keys/ClientSSKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/ClientSSKBlock.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/keys/ClientSSKBlock.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -22,7 +22,7 @@
private ClientSSK key;
public ClientSSKBlock(byte[] data, byte[] headers, ClientSSK key)
throws SSKVerifyException {
- super(data, headers, (NodeSSK) key.getNodeKey());
+ super(data, headers, (NodeSSK) key.getNodeKey(), false);
}
/**
Modified: trunk/freenet/src/freenet/keys/Key.java
===================================================================
--- trunk/freenet/src/freenet/keys/Key.java 2006-01-06 22:12:27 UTC (rev
7771)
+++ trunk/freenet/src/freenet/keys/Key.java 2006-01-06 22:41:05 UTC (rev
7772)
@@ -116,4 +116,8 @@
}
}
+ public byte[] getRoutingKey() {
+ return routingKey;
+ }
+
}
Modified: trunk/freenet/src/freenet/keys/SSKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/SSKBlock.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/keys/SSKBlock.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -54,7 +54,7 @@
* Initialize, and verify data, headers against key. Provided
* key must have a pubkey, or we throw.
*/
- public SSKBlock(byte[] data, byte[] headers, NodeSSK nodeKey) throws
SSKVerifyException {
+ public SSKBlock(byte[] data, byte[] headers, NodeSSK nodeKey, boolean
dontVerify) throws SSKVerifyException {
if(headers.length != TOTAL_HEADERS_LENGTH)
throw new
IllegalArgumentException("Headers.length="+headers.length+" should be
"+TOTAL_HEADERS_LENGTH);
this.data = data;
@@ -81,24 +81,28 @@
int x = 2;
if(x+SIG_R_LENGTH+SIG_S_LENGTH > headers.length)
throw new SSKVerifyException("Headers too short:
"+headers.length+" should be at least "+x+SIG_R_LENGTH+SIG_S_LENGTH);
- System.arraycopy(headers, x, bufR, 0, SIG_R_LENGTH);
+ if(!dontVerify)
+ System.arraycopy(headers, x, bufR, 0, SIG_R_LENGTH);
x+=SIG_R_LENGTH;
- System.arraycopy(headers, x, bufS, 0, SIG_S_LENGTH);
+ if(!dontVerify)
+ System.arraycopy(headers, x, bufS, 0, SIG_S_LENGTH);
x+=SIG_S_LENGTH;
// Compute the hash on the data
- md.update(data);
- byte[] dataHash = md.digest();
- md.update(dataHash);
- md.update(headers, x, headers.length - x);
- byte[] overallHash = md.digest();
- // Now verify it
- NativeBigInteger r = new NativeBigInteger(1, bufR);
- NativeBigInteger s = new NativeBigInteger(1, bufS);
- if(!DSA.verify(pubKey, new DSASignature(r, s), new
NativeBigInteger(1, overallHash))) {
- throw new SSKVerifyException("Signature verification
failed for node-level SSK");
+ if(!dontVerify) {
+ md.update(data);
+ byte[] dataHash = md.digest();
+ md.update(dataHash);
+ md.update(headers, x, headers.length - x);
+ byte[] overallHash = md.digest();
+ // Now verify it
+ NativeBigInteger r = new NativeBigInteger(1, bufR);
+ NativeBigInteger s = new NativeBigInteger(1, bufS);
+ if(!DSA.verify(pubKey, new DSASignature(r, s), new
NativeBigInteger(1, overallHash))) {
+ throw new SSKVerifyException("Signature
verification failed for node-level SSK");
+ }
+ if(headers.length < x+2+E_H_DOCNAME_LENGTH)
+ throw new SSKVerifyException("Headers too short
after sig verification: "+headers.length+" should be "+x+2+E_H_DOCNAME_LENGTH);
}
- if(headers.length < x+2+E_H_DOCNAME_LENGTH)
- throw new SSKVerifyException("Headers too short after
sig verification: "+headers.length+" should be "+x+2+E_H_DOCNAME_LENGTH);
symCipherIdentifier = (short)(((headers[x] & 0xff) << 8) +
(headers[x+1] & 0xff));
x+=2;
byte[] ehDocname = new byte[E_H_DOCNAME_LENGTH];
Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -796,4 +796,12 @@
public boolean anyTransfersFailed() {
return transferTimedOut;
}
+
+ public byte[] getPubkeyHash() {
+ return headers;
+ }
+
+ public byte[] getHeaders() {
+ return headers;
+ }
}
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -64,12 +64,27 @@
public void run() {
try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ } finally {
+ Logger.minor(this, "Exiting InsertHandler.run() for "+uid);
+ node.unlockUID(uid);
+ }
+ }
+
+ private void realRun() {
runThread = Thread.currentThread();
// FIXME implement rate limiting or something!
// Send Accepted
Message accepted = DMT.createFNPAccepted(uid);
- source.send(accepted);
+ try {
+ source.send(accepted);
+ } catch (NotConnectedException e1) {
+ Logger.minor(this, "Lost connection to source");
+ return;
+ }
// Source will send us a DataInsert
@@ -87,17 +102,22 @@
Logger.minor(this, "Received "+msg);
if(msg == null) {
- if(source.isConnected() && startTime >
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4))
- Logger.error(this, "Did not receive DataInsert on "+uid+" from
"+source+" !");
- Message tooSlow = DMT.createFNPRejectedTimeout(uid);
- source.sendAsync(tooSlow, null);
- Message m = DMT.createFNPInsertTransfersCompleted(uid, true);
- source.sendAsync(m, null);
- prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
- br = new BlockReceiver(node.usm, source, uid, prb);
- prb.abort(RetrievalException.NO_DATAINSERT, "No DataInsert");
- br.sendAborted(RetrievalException.NO_DATAINSERT, "No DataInsert");
- return;
+ try {
+ if(source.isConnected() && startTime >
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4))
+ Logger.error(this, "Did not receive DataInsert
on "+uid+" from "+source+" !");
+ Message tooSlow = DMT.createFNPRejectedTimeout(uid);
+ source.sendAsync(tooSlow, null);
+ Message m = DMT.createFNPInsertTransfersCompleted(uid,
true);
+ source.sendAsync(m, null);
+ prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK,
Node.PACKET_SIZE);
+ br = new BlockReceiver(node.usm, source, uid, prb);
+ prb.abort(RetrievalException.NO_DATAINSERT, "No
DataInsert");
+ br.sendAborted(RetrievalException.NO_DATAINSERT, "No
DataInsert");
+ return;
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to source");
+ return;
+ }
}
// We have a DataInsert
@@ -125,7 +145,11 @@
canCommit = true;
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
- source.send(msg);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
finish();
return;
}
@@ -170,7 +194,12 @@
receivedRejectedOverload = true;
// Forward it
Message m = DMT.createFNPRejectedOverload(uid, false);
- source.send(m);
+ try {
+ source.send(m);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
}
int status = sender.getStatus();
@@ -195,7 +224,12 @@
status == CHKInsertSender.GENERATED_REJECTED_OVERLOAD ||
status == CHKInsertSender.INTERNAL_ERROR) {
msg = DMT.createFNPRejectedOverload(uid, true);
- source.send(msg);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
// Might as well store it anyway.
if(status == CHKInsertSender.TIMED_OUT ||
status ==
CHKInsertSender.GENERATED_REJECTED_OVERLOAD)
@@ -206,7 +240,12 @@
if(status == CHKInsertSender.ROUTE_NOT_FOUND || status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
- source.send(msg);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
canCommit = true;
finish();
return;
@@ -215,7 +254,12 @@
if(status == CHKInsertSender.SUCCESS) {
msg = DMT.createFNPInsertReply(uid);
sentSuccess = true;
- source.send(msg);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
canCommit = true;
finish();
return;
@@ -224,19 +268,17 @@
// Otherwise...?
Logger.error(this, "Unknown status code:
"+sender.getStatusString());
msg = DMT.createFNPRejectedOverload(uid, true);
- source.send(msg);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
finish();
return;
}
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- } finally {
- Logger.minor(this, "Exiting InsertHandler.run() for "+uid);
- node.unlockUID(uid);
- }
- }
+ }
- private boolean canCommit = false;
+ private boolean canCommit = false;
private boolean sentCompletion = false;
private Object sentCompletionLock = new Object();
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-01-06 22:12:27 UTC (rev
7771)
+++ trunk/freenet/src/freenet/node/Node.java 2006-01-06 22:41:05 UTC (rev
7772)
@@ -49,6 +49,7 @@
import freenet.keys.ClientKeyBlock;
import freenet.keys.Key;
import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
import freenet.keys.SSKBlock;
import freenet.store.BerkeleyDBFreenetStore;
import freenet.store.FreenetStore;
@@ -843,6 +844,14 @@
}
}
+ public synchronized void store(SSKBlock block) {
+ try {
+ sskDatastore.put(block);
+ } catch (IOException e) {
+ Logger.error(this, "Cannot store data: "+e, e);
+ }
+ }
+
/**
* Remove a sender from the set of currently transferring senders.
*/
@@ -928,6 +937,24 @@
return is;
}
+ public synchronized SSKInsertSender makeInsertSender(SSKBlock block, short
htl, long uid, PeerNode source,
+ boolean fromStore, double closestLoc, boolean cache) {
+ Key key = block.getKey();
+ Logger.minor(this,
"makeInsertSender("+key+","+htl+","+uid+","+source+",...,"+fromStore);
+ KeyHTLPair kh = new KeyHTLPair(key, htl);
+ SSKInsertSender is = (SSKInsertSender) insertSenders.get(kh);
+ if(is != null) {
+ Logger.minor(this, "Found "+is+" for "+kh);
+ return is;
+ }
+ if(fromStore && !cache)
+ throw new IllegalArgumentException("From store = true but cache
= false !!!");
+ is = new SSKInsertSender(block, uid, htl, source, this, fromStore,
closestLoc);
+ Logger.minor(this, is.toString()+" for "+kh.toString());
+ insertSenders.put(kh, is);
+ return is;
+ }
+
public boolean lockUID(long uid) {
Logger.minor(this, "Locking "+uid);
Long l = new Long(uid);
Added: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-01-06
22:12:27 UTC (rev 7771)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2006-01-06
22:41:05 UTC (rev 7772)
@@ -0,0 +1,277 @@
+package freenet.node;
+
+import java.io.IOException;
+
+import freenet.crypt.DSAPublicKey;
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.BlockReceiver;
+import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
+import freenet.keys.SSKVerifyException;
+import freenet.support.Logger;
+import freenet.support.ShortBuffer;
+
+/**
+ * Handles an incoming SSK insert.
+ * SSKs need their own insert/request classes, see comments in SSKInsertSender.
+ */
+public class SSKInsertHandler implements Runnable {
+
+ static final int PUBKEY_TIMEOUT = 10000;
+
+ final Message req;
+ final Node node;
+ final long uid;
+ final PeerNode source;
+ final NodeSSK key;
+ final long startTime;
+ private SSKBlock block;
+ private DSAPublicKey pubKey;
+ private double closestLoc;
+ private short htl;
+ private SSKInsertSender sender;
+ private byte[] data;
+ private byte[] headers;
+ private BlockReceiver br;
+ private Thread runThread;
+ private boolean sentSuccess;
+ private boolean canCommit;
+
+ SSKInsertHandler(Message req, long id, Node node, long startTime) {
+ this.req = req;
+ this.node = node;
+ this.uid = id;
+ this.source = (PeerNode) req.getSource();
+ this.startTime = startTime;
+ key = (NodeSSK) req.getObject(DMT.FREENET_ROUTING_KEY);
+ htl = req.getShort(DMT.HTL);
+ closestLoc = req.getDouble(DMT.NEAREST_LOCATION);
+ double targetLoc = key.toNormalizedDouble();
+ double myLoc = node.lm.getLocation().getValue();
+ if(Math.abs(targetLoc - myLoc) < Math.abs(targetLoc - closestLoc))
+ closestLoc = myLoc;
+ byte[] pubKeyHash =
((ShortBuffer)req.getObject(DMT.PUBKEY_HASH)).getData();
+ pubKey = node.getKey(pubKeyHash);
+ data = ((ShortBuffer) req.getObject(DMT.DATA)).getData();
+ headers = ((ShortBuffer) req.getObject(DMT.BLOCK_HEADERS)).getData();
+ canCommit = false;
+ }
+
+ public String toString() {
+ return super.toString()+" for "+uid;
+ }
+
+ public void run() {
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ } finally {
+ Logger.minor(this, "Exiting InsertHandler.run() for "+uid);
+ node.unlockUID(uid);
+ }
+ }
+
+ private void realRun() {
+ runThread = Thread.currentThread();
+
+ // Send Accepted
+ Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);
+
+ try {
+ source.send(accepted);
+ } catch (NotConnectedException e1) {
+ Logger.minor(this, "Lost connection to source");
+ return;
+ }
+
+ if(pubKey == null) {
+ // Wait for pub key
+
+ MessageFilter mfPK =
MessageFilter.create().setType(DMT.FNPSSKPubKey).setField(DMT.UID,
uid).setSource(source).setTimeout(PUBKEY_TIMEOUT);
+
+ try {
+ Message pk = node.usm.waitFor(mfPK);
+ if(pk == null) {
+ Logger.normal(this, "Failed to receive
FNPSSKPubKey for "+uid);
+ return;
+ }
+ byte[] pubkeyAsBytes =
((ShortBuffer)pk.getObject(DMT.PUBKEY_AS_BYTES)).getData();
+ try {
+ pubKey = new
DSAPublicKey(pubkeyAsBytes);
+ } catch (IOException e) {
+ Logger.error(this, "Invalid pubkey from
"+source+" on "+uid);
+ Message msg =
DMT.createFNPDataInsertRejected(uid, DMT.DATA_INSERT_REJECTED_SSK_ERROR);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException ee) {
+ // Ignore
+ }
+ return;
+ }
+ } catch (DisconnectedException e) {
+ Logger.minor(this, "Lost connection to source");
+ return;
+ }
+ }
+
+ // Now we have the data, the headers and the pubkey. Commit it.
+
+ try {
+ block = new SSKBlock(data, headers, key, false);
+ } catch (SSKVerifyException e1) {
+ Logger.error(this, "Invalid SSK from "+source);
+ Message msg = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_SSK_ERROR);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ return;
+ }
+ Logger.minor(this, "Committed SSK "+key+" for "+uid);
+
+ if(htl == 0) {
+ Message msg = DMT.createFNPInsertReply(uid);
+ sentSuccess = true;
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ canCommit = true;
+ finish();
+ return;
+ }
+
+ if(htl > 0)
+ sender = node.makeInsertSender(block, htl, uid, source, false,
closestLoc, true);
+
+ boolean receivedRejectedOverload = false;
+
+ while(true) {
+ synchronized(sender) {
+ try {
+ if(sender.getStatus() == SSKInsertSender.NOT_FINISHED)
+ sender.wait(5000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ if((!receivedRejectedOverload) &&
sender.receivedRejectedOverload()) {
+ receivedRejectedOverload = true;
+ // Forward it
+ Message m = DMT.createFNPRejectedOverload(uid, false);
+ try {
+ source.send(m);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+ }
+
+ if(sender.hasRecentlyCollided()) {
+ // Forward collision
+ data = sender.getData();
+ headers = sender.getHeaders();
+ try {
+ block = new SSKBlock(data, headers,
key, true);
+ } catch (SSKVerifyException e1) {
+ // Is verified elsewhere...
+ throw new Error("Impossible: "+e1);
+ }
+ Message msg = DMT.createFNPSSKDataFound(uid, key, headers,
data, sender.getPubkeyHash());
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to source");
+ return;
+ }
+ }
+
+ int status = sender.getStatus();
+
+ if(status == CHKInsertSender.NOT_FINISHED) {
+ continue;
+ }
+
+ // Local RejectedOverload's (fatal).
+ // Internal error counts as overload. It'd only create a timeout
otherwise, which is the same thing anyway.
+ // We *really* need a good way to deal with nodes that constantly
R_O!
+ if(status == SSKInsertSender.TIMED_OUT ||
+ status == SSKInsertSender.GENERATED_REJECTED_OVERLOAD ||
+ status == SSKInsertSender.INTERNAL_ERROR) {
+ Message msg = DMT.createFNPRejectedOverload(uid, true);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+ // Might as well store it anyway.
+ if(status == CHKInsertSender.TIMED_OUT ||
+ status ==
CHKInsertSender.GENERATED_REJECTED_OVERLOAD)
+ canCommit = true;
+ finish();
+ return;
+ }
+
+ if(status == CHKInsertSender.ROUTE_NOT_FOUND || status ==
CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
+ Message msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+ canCommit = true;
+ finish();
+ return;
+ }
+
+ if(status == CHKInsertSender.SUCCESS) {
+ Message msg = DMT.createFNPInsertReply(uid);
+ sentSuccess = true;
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to
source");
+ return;
+ }
+ canCommit = true;
+ finish();
+ return;
+ }
+
+ // Otherwise...?
+ Logger.error(this, "Unknown status code:
"+sender.getStatusString());
+ Message msg = DMT.createFNPRejectedOverload(uid, true);
+ try {
+ source.send(msg);
+ } catch (NotConnectedException e) {
+ // Ignore
+ }
+ finish();
+ return;
+ }
+ }
+
+ /**
+ * If canCommit, and we have received all the data, and it
+ * verifies, then commit it.
+ */
+ private void finish() {
+ Logger.minor(this, "Finishing");
+
+ if(canCommit) {
+ node.store(block);
+ }
+ }
+
+}
Added: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -0,0 +1,514 @@
+package freenet.node;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import freenet.crypt.DSAPublicKey;
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
+import freenet.keys.SSKVerifyException;
+import freenet.support.Logger;
+import freenet.support.ShortBuffer;
+
+/**
+ * SSKs require separate logic for inserts and requests, for various reasons:
+ * - SSKs can collide.
+ * - SSKs have only 1kB of data, so we can pack it into the DataReply, and we
don't need to
+ * wait for a long data-transfer timeout.
+ * - SSKs have pubkeys, which don't always need to be sent.
+ */
+public class SSKInsertSender implements Runnable, AnyInsertSender {
+
+ // Constants
+ static final int ACCEPTED_TIMEOUT = 10000;
+ static final int SEARCH_TIMEOUT = 60000;
+
+ // Basics
+ final NodeSSK myKey;
+ final double target;
+ final long uid;
+ short htl;
+ final PeerNode source;
+ final Node node;
+ /** SSK's pubkey */
+ final DSAPublicKey pubKey;
+ /** SSK's pubkey's hash */
+ final byte[] pubKeyHash;
+ /** Data (we know at start of insert) - can change if we get a collision */
+ byte[] data;
+ /** Headers (we know at start of insert) - can change if we get a
collision */
+ byte[] headers;
+ final boolean fromStore;
+ final double closestLocation;
+ final long startTime;
+ private boolean sentRequest;
+ private boolean hasCollided;
+ private boolean hasRecentlyCollided;
+
+ /** Time at which we set status to a value other than NOT_FINISHED */
+ private long setStatusTime = -1;
+
+ private int status = -1;
+ /** Still running */
+ static final int NOT_FINISHED = -1;
+ /** Successful insert */
+ static final int SUCCESS = 0;
+ /** Route not found */
+ static final int ROUTE_NOT_FOUND = 1;
+ /** Internal error */
+ static final int INTERNAL_ERROR = 3;
+ /** Timed out waiting for response */
+ static final int TIMED_OUT = 4;
+ /** Locally Generated a RejectedOverload */
+ static final int GENERATED_REJECTED_OVERLOAD = 5;
+ /** Could not get off the node at all! */
+ static final int ROUTE_REALLY_NOT_FOUND = 6;
+
+ SSKInsertSender(SSKBlock block, long uid, short htl, PeerNode source, Node
node, boolean fromStore, double closestLoc) {
+ this.fromStore = fromStore;
+ this.closestLocation = closestLoc;
+ this.node = node;
+ this.source = source;
+ this.htl = htl;
+ this.uid = uid;
+ myKey = (NodeSSK) block.getKey();
+ data = block.getRawData();
+ headers = block.getRawHeaders();
+ target = myKey.toNormalizedDouble();
+ pubKey = myKey.getPubKey();
+ if(pubKey == null)
+ throw new IllegalArgumentException("Must have pubkey to insert
data!!");
+ // pubKey.fingerprint() is not the same as hash(pubKey.asBytes())).
FIXME it should be!
+ byte[] pubKeyAsBytes = pubKey.asBytes();
+ try {
+ MessageDigest md256 =
MessageDigest.getInstance("SHA-256");
+ pubKeyHash = md256.digest(pubKeyAsBytes);
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-256 not supported by system!: "+e);
+ }
+
+ startTime = System.currentTimeMillis();
+ Thread t = new Thread(this, "SSKInsertSender for UID "+uid+" on
"+node.portNumber+" at "+System.currentTimeMillis());
+ t.setDaemon(true);
+ t.start();
+ }
+
+ public void run() {
+ short origHTL = htl;
+
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ if(status == NOT_FINISHED)
+ finish(INTERNAL_ERROR, null);
+ } finally {
+ node.completed(uid);
+ node.removeInsertSender(myKey, origHTL, this);
+ }
+ }
+
+ private void realRun() {
+ HashSet nodesRoutedTo = new HashSet();
+ HashSet nodesNotIgnored = new HashSet();
+
+ while(true) {
+
+ if(htl == 0) {
+ // Send an InsertReply back
+ finish(SUCCESS, null);
+ return;
+ }
+
+ // Route it
+ PeerNode next;
+ // Can backtrack, so only route to nodes closer than we are to
target.
+ double nextValue;
+ synchronized(node.peers) {
+ next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true);
+ if(next != null)
+ nextValue = next.getLocation().getValue();
+ else
+ nextValue = -1.0;
+ }
+
+ if(next == null) {
+ // Backtrack
+ finish(ROUTE_NOT_FOUND, null);
+ return;
+ }
+ Logger.minor(this, "Routing insert to "+next);
+ nodesRoutedTo.add(next);
+
+ if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
+ Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
+ htl = node.decrementHTL(source, htl);
+ }
+
+ Message req = DMT.createFNPSSKInsertRequest(uid, htl, myKey,
closestLocation, headers, data, pubKeyHash);
+
+ // Wait for ack or reject... will come before even a locally
generated DataReply
+
+ MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPSSKAccepted);
+ MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+ // mfRejectedOverload must be the last thing in the or
+ // So its or pointer remains null
+ // Otherwise we need to recreate it below
+ mfRejectedOverload.clearOr();
+ MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
+
+ // Send to next node
+
+ try {
+ next.sendAsync(req, null);
+ } catch (NotConnectedException e1) {
+ Logger.minor(this, "Not connected to "+next);
+ continue;
+ }
+ sentRequest = true;
+
+ Message msg = null;
+
+ /*
+ * Because messages may be re-ordered, it is
+ * entirely possible that we get a non-local RejectedOverload,
+ * followed by an Accepted. So we must loop here.
+ */
+
+ while (true) {
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from
" + next
+ + " while waiting for
Accepted");
+ break;
+ }
+
+ if (msg == null) {
+ // Terminal overload
+ // Try to propagate back to source
+ Logger.minor(this, "Timeout");
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Non-fatal - probably still have time
left
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger.minor(this,
+
"Local RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ } else {
+ forwardRejectedOverload();
+ }
+ continue;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedLoop) {
+ next.successNotOverload();
+ // Loop - we don't want to send the
data to this one
+ break;
+ }
+
+ if (msg.getSpec() != DMT.FNPSSKAccepted) {
+ Logger.error(this,
+ "Unexpected message
waiting for SSKAccepted: "
+ + msg);
+ break;
+ }
+ // Otherwise is an FNPSSKAccepted
+ break;
+ }
+
+ if(msg == null || msg.getSpec() != DMT.FNPSSKAccepted) continue;
+
+ Logger.minor(this, "Got Accepted on "+this);
+
+ // Firstly, do we need to send them the pubkey?
+
+ if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
+ Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey.asBytes());
+ try {
+ next.sendAsync(pkMsg, null);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Node disconnected while sending
pubkey: "+next);
+ continue;
+ }
+
+ // Wait for the SSKPubKeyAccepted
+
+ MessageFilter mf1 =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPSSKPubKeyAccepted);
+
+ Message newAck;
+ try {
+ newAck = node.usm.waitFor(mf1);
+ } catch (DisconnectedException e) {
+ Logger.minor(this, "Disconnected from
"+next);
+ htl--;
+ break;
+ }
+
+ if(newAck == null) {
+ // Terminal overload
+ // Try to propagate back to source
+ Logger.minor(this, "Timeout");
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+ }
+
+ // We have sent them the pubkey, and the data.
+ // Wait for the response.
+
+ /** What are we waiting for now??:
+ * - FNPRouteNotFound - couldn't exhaust HTL, but send us the
+ * data anyway please
+ * - FNPInsertReply - used up all HTL, yay
+ * - FNPRejectOverload - propagating an overload error :(
+ * - FNPDataFound - target already has the data, and the data is
+ * an SVK/SSK/KSK, therefore could be different to what we are
+ * inserting.
+ * - FNPDataInsertRejected - the insert was invalid
+ */
+
+ MessageFilter mfInsertReply =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPInsertReply);
+ mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
+ mfRejectedOverload.clearOr();
+ MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
+ MessageFilter mfDataInsertRejected =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPDataInsertRejected);
+ MessageFilter mfDataFound =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPSSKDataFound);
+
+ mf =
mfRouteNotFound.or(mfInsertReply.or(mfRejectedOverload.or(mfDataFound.or(mfDataInsertRejected))));
+
+ while (true) {
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "Disconnected from
" + next
+ + " while waiting for
InsertReply on " + this);
+ break;
+ }
+
+ if (msg == null) {
+ // Timeout :(
+ // Fairly serious problem
+ Logger.error(this, "Timeout (" + msg
+ + ") after Accepted in
insert");
+ // Terminal overload
+ // Try to propagate back to source
+ next.localRejectedOverload();
+ finish(TIMED_OUT, next);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Probably non-fatal, if so, we have
time left, can try next one
+ if (msg.getBoolean(DMT.IS_LOCAL)) {
+ next.localRejectedOverload();
+ Logger.minor(this,
+ "Local
RejectedOverload, moving on to next peer");
+ // Give up on this one, try
another
+ break;
+ } else {
+ forwardRejectedOverload();
+ }
+ continue; // Wait for any further
response
+ }
+
+ if (msg.getSpec() == DMT.FNPRouteNotFound) {
+ Logger.minor(this, "Rejected: RNF");
+ short newHtl = msg.getShort(DMT.HTL);
+ if (htl > newHtl)
+ htl = newHtl;
+ // Finished as far as this node is
concerned
+ next.successNotOverload();
+ break;
+ }
+
+ if (msg.getSpec() == DMT.FNPDataInsertRejected)
{
+ next.successNotOverload();
+ short reason =
msg.getShort(DMT.DATA_INSERT_REJECTED_REASON);
+ Logger.minor(this, "DataInsertRejected:
" + reason);
+ if (reason ==
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
+ if (fromStore) {
+ // That's odd...
+
Logger.error(this,"Verify failed on next node "
+ + next
+ " for DataInsert but we were sending from the store!");
+ }
+ }
+ Logger.error(this, "SSK insert
rejected! Reason="
+ +
DMT.getDataInsertRejectedReason(reason));
+ break; // What else can we do?
+ }
+
+ if (msg.getSpec() == DMT.FNPSSKDataFound) {
+ /**
+ * Data was already on node, and was
NOT equal to what we sent. COLLISION!
+ *
+ * We can either accept the old data or
the new data.
+ * OLD DATA:
+ * - KSK-based stuff is usable. Well,
somewhat; a node could spoof KSKs on
+ * receiving an insert, (if it knows
them in advance), but it cannot just
+ * start inserts to overwrite old SSKs.
+ * - You cannot "update" an SSK.
+ * NEW DATA:
+ * - KSK-based stuff not usable. (Some
people think this is a good idea!).
+ * - Illusion of updatability. (VERY
BAD IMHO, because it's not really
+ * updatable... FIXME implement TUKs;
would determine latest version based
+ * on version number, and propagate on
request with a certain probability or
+ * according to time. However there are
good arguments to do updating at a
+ * higher level (e.g. key bottleneck
argument), and TUKs should probably be
+ * distinct from SSKs.
+ *
+ * For now, accept the "old" i.e.
preexisting data.
+ */
+ Logger.normal(this, "Got collision on
"+myKey+" ("+uid+") sending to "+next.getPeer());
+
+ // FNPSSKDataFound == FNPInsertRequest
+
+ // Lets assume the pubkeys are the same
(otherwise SHA-256 has been broken
+ // and we're completely screwed anyway).
+
+ byte[] newData = ((ShortBuffer)
msg.getObject(DMT.DATA)).getData();
+ byte[] newHeaders = ((ShortBuffer)
msg.getObject(DMT.BLOCK_HEADERS)).getData();
+ if(Arrays.equals(newData, data) &&
Arrays.equals(newHeaders, headers)) {
+ Logger.error(this, "Node sent
us collision but data and headers are identical!! from "+next+" on "+uid);
+ // Try next node, this one is
evil!
+ break;
+ }
+
+ SSKBlock newBlock;
+ try {
+ newBlock = new
SSKBlock(newData, newHeaders, myKey, false);
+ } catch (SSKVerifyException e) {
+ Logger.error(this, "Node sent
us collision but got corrupt SSK!! from "+next+" on "+uid);
+ // Try next node, no way to
tell this one about its mistake as it's stopped listening. FIXME should it?
+ break;
+ }
+
+ data = newData;
+ headers = newHeaders;
+ synchronized(this) {
+ hasRecentlyCollided = true;
+ hasCollided = true;
+ notifyAll();
+ }
+ }
+
+ if (msg.getSpec() != DMT.FNPInsertReply) {
+ Logger.error(this, "Unknown reply: " +
msg);
+ finish(INTERNAL_ERROR, next);
+ }
+
+ // Our task is complete
+ next.successNotOverload();
+ finish(SUCCESS, next);
+ return;
+ }
+ }
+ }
+
+ private boolean hasForwardedRejectedOverload = false;
+
+ synchronized boolean receivedRejectedOverload() {
+ return hasForwardedRejectedOverload;
+ }
+
+ /** Forward RejectedOverload to the request originator.
+ * DO NOT CALL if have a *local* RejectedOverload.
+ */
+ private synchronized void forwardRejectedOverload() {
+ if(hasForwardedRejectedOverload) return;
+ hasForwardedRejectedOverload = true;
+ notifyAll();
+ }
+
+ private void finish(int code, PeerNode next) {
+ Logger.minor(this, "Finished: "+code+" on "+this, new
Exception("debug"));
+ if(status != NOT_FINISHED)
+ throw new IllegalStateException("finish() called with "+code+"
when was already "+status);
+
+ setStatusTime = System.currentTimeMillis();
+
+ if(code == ROUTE_NOT_FOUND && !sentRequest)
+ code = ROUTE_REALLY_NOT_FOUND;
+
+ status = code;
+
+ synchronized(this) {
+ notifyAll();
+ }
+
+ Logger.minor(this, "Set status code: "+getStatusString());
+ // Nothing to wait for, no downstream transfers, just exit.
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public short getHTL() {
+ return htl;
+ }
+
+ /**
+ * @return The current status as a string
+ */
+ public String getStatusString() {
+ if(status == SUCCESS)
+ return "SUCCESS";
+ if(status == ROUTE_NOT_FOUND)
+ return "ROUTE NOT FOUND";
+ if(status == NOT_FINISHED)
+ return "NOT FINISHED";
+ if(status == INTERNAL_ERROR)
+ return "INTERNAL ERROR";
+ if(status == TIMED_OUT)
+ return "TIMED OUT";
+ if(status == GENERATED_REJECTED_OVERLOAD)
+ return "GENERATED REJECTED OVERLOAD";
+ if(status == ROUTE_REALLY_NOT_FOUND)
+ return "ROUTE REALLY NOT FOUND";
+ return "UNKNOWN STATUS CODE: "+status;
+ }
+
+ public boolean sentRequest() {
+ return sentRequest;
+ }
+
+ public synchronized boolean hasRecentlyCollided() {
+ boolean status = hasRecentlyCollided;
+ hasRecentlyCollided = false;
+ return status;
+ }
+
+ public boolean hasCollided() {
+ return hasCollided;
+ }
+
+ public byte[] getPubkeyHash() {
+ return headers;
+ }
+
+ public byte[] getHeaders() {
+ return headers;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-01-06 22:12:27 UTC (rev
7771)
+++ trunk/freenet/src/freenet/node/Version.java 2006-01-06 22:41:05 UTC (rev
7772)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 325;
+ public static final int buildNumber = 326;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 318;
Modified: trunk/freenet/src/freenet/store/BaseFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BaseFreenetStore.java 2006-01-06
22:12:27 UTC (rev 7771)
+++ trunk/freenet/src/freenet/store/BaseFreenetStore.java 2006-01-06
22:41:05 UTC (rev 7772)
@@ -7,7 +7,11 @@
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
+import freenet.keys.KeyBlock;
import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
+import freenet.keys.SSKVerifyException;
import freenet.support.Fields;
import freenet.support.Logger;
@@ -91,11 +95,55 @@
}
/**
+ * Retrieve a block.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public synchronized SSKBlock fetch(NodeSSK chk, boolean dontPromote)
throws IOException {
+ byte[] data = dataStore.getDataForBlock(chk, dontPromote);
+ if(data == null) {
+ if(headersStore.getDataForBlock(chk, true) != null) {
+ Logger.normal(this, "Deleting: "+chk+" headers, no data");
+ headersStore.delete(chk);
+ }
+ return null;
+ }
+ byte[] headers = headersStore.getDataForBlock(chk, dontPromote);
+ if(headers == null) {
+ // No headers, delete
+ Logger.normal(this, "Deleting: "+chk+" data, no headers");
+ dataStore.delete(chk);
+ return null;
+ }
+ // Decode
+ int headerLen = ((headers[0] & 0xff) << 8) + (headers[1] & 0xff);
+ if(headerLen > HEADER_BLOCK_SIZE-2) {
+ Logger.normal(this, "Invalid header data on "+chk+", deleting");
+ dataStore.delete(chk);
+ headersStore.delete(chk);
+ return null;
+ }
+ byte[] buf = new byte[headerLen];
+ System.arraycopy(headers, 2, buf, 0, headerLen);
+ Logger.minor(this, "Get key: "+chk);
+ Logger.minor(this, "Raw headers: "+headers.length+" bytes, hash
"+Fields.hashCode(headers));
+ Logger.minor(this, "Headers: "+headerLen+" bytes, hash
"+Fields.hashCode(buf));
+ Logger.minor(this, "Data: "+data.length+" bytes, hash
"+Fields.hashCode(data));
+ try {
+ return new SSKBlock(data, buf, chk, true);
+ } catch (SSKVerifyException e) {
+ Logger.normal(this, "Does not verify, deleting: "+chk);
+ dataStore.delete(chk);
+ headersStore.delete(chk);
+ return null;
+ }
+ }
+
+ /**
* Store a block.
*/
- public synchronized void put(CHKBlock block) throws IOException {
- byte[] data = block.getData();
- byte[] headers = block.getHeaders();
+ public synchronized void put(KeyBlock block) throws IOException {
+ byte[] data = block.getRawData();
+ byte[] headers = block.getRawHeaders();
int hlen = headers.length;
if(data.length != DATA_BLOCK_SIZE || hlen > HEADER_BLOCK_SIZE-2)
throw new IllegalArgumentException("Too big - data:
"+data.length+" should be "+
Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-06
22:12:27 UTC (rev 7771)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-06
22:41:05 UTC (rev 7772)
@@ -25,7 +25,11 @@
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
+import freenet.keys.KeyBlock;
import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
+import freenet.keys.SSKVerifyException;
import freenet.support.Fields;
import freenet.support.Logger;
@@ -202,17 +206,97 @@
// return null;
}
+ /**
+ * Retrieve a block.
+ * @param dontPromote If true, don't promote data if fetched.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public SSKBlock fetch(NodeSSK chk, boolean dontPromote) throws IOException
+ {
+ if(closed)
+ return null;
+
+ byte[] routingkey = chk.getRoutingKey();
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ Cursor c = null;
+ try{
+ Transaction t = environment.beginTransaction(null,null);
+ c = chkDB.openCursor(t,null);
+
+ if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.DEFAULT)
+ !=OperationStatus.SUCCESS) {
+ c.close();
+ t.abort();
+ return null;
+ }
+
+ StoreBlock storeBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(blockDBE);
+
+ SSKBlock block = null;
+ try{
+ byte[] header = new byte[headerBlockSize];
+ byte[] data = new byte[dataBlockSize];
+ synchronized(chkStore) {
+
chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
+ chkStore.read(header);
+ chkStore.read(data);
+ }
+
+
+ block = new SSKBlock(data,header,chk, true);
+
+ if(!dontPromote)
+ {
+ storeBlock.updateRecentlyUsed();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+
storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
+ c.putCurrent(updateDBE);
+ c.close();
+ t.commit();
+ }else{
+ c.close();
+ t.abort();
+ }
+
+ Logger.minor(this, "Get key: "+chk);
+ Logger.minor(this, "Headers: "+header.length+" bytes, hash
"+header);
+ Logger.minor(this, "Data: "+data.length+" bytes, hash
"+data);
+
+ }catch(SSKVerifyException ex){
+ Logger.normal(this, "Does not verify, setting
accessTime to 0 for : "+chk);
+ storeBlock.setRecentlyUsedToZero();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
updateDBE);
+ c.putCurrent(updateDBE);
+ c.close();
+ t.commit();
+ return null;
+ }
+ return block;
+ }catch(Exception ex) { // FIXME: ugly
+ if(c!=null) {
+ try{c.close();}catch(DatabaseException ex2){}
+ }
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ }
+
+// return null;
+ }
+
/**
* Store a block.
*/
- public void put(CHKBlock block) throws IOException
+ public void put(KeyBlock block) throws IOException
{
if(closed)
return;
byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
- byte[] data = block.getData();
- byte[] header = block.getHeaders();
+ byte[] data = block.getRawData();
+ byte[] header = block.getRawHeaders();
if(data.length!=dataBlockSize) {
Logger.minor(this, "This data is "+data.length+" bytes. Should
be "+dataBlockSize);
Modified: trunk/freenet/src/freenet/store/FreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/FreenetStore.java 2006-01-06 22:12:27 UTC
(rev 7771)
+++ trunk/freenet/src/freenet/store/FreenetStore.java 2006-01-06 22:41:05 UTC
(rev 7772)
@@ -3,7 +3,11 @@
import java.io.IOException;
import freenet.keys.CHKBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
/**
* Datastore interface
@@ -15,10 +19,17 @@
* @param dontPromote If true, don't promote data if fetched.
* @return null if there is no such block stored, otherwise the block.
*/
- public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException;
+ public CHKBlock fetch(NodeCHK key, boolean dontPromote) throws IOException;
/**
+ * Retrieve a block.
+ * @param dontPromote If true, don't promote data if fetched.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public SSKBlock fetch(NodeSSK key, boolean dontPromote) throws IOException;
+
+ /**
* Store a block.
*/
- public void put(CHKBlock block) throws IOException;
+ public void put(KeyBlock block) throws IOException;
}
\ No newline at end of file
Added: trunk/freenet/src/freenet/support/ImmutableByteArrayWrapper.java
===================================================================
--- trunk/freenet/src/freenet/support/ImmutableByteArrayWrapper.java
2006-01-06 22:12:27 UTC (rev 7771)
+++ trunk/freenet/src/freenet/support/ImmutableByteArrayWrapper.java
2006-01-06 22:41:05 UTC (rev 7772)
@@ -0,0 +1,30 @@
+/**
+ *
+ */
+package freenet.support;
+
+import java.util.Arrays;
+
+public class ImmutableByteArrayWrapper {
+ final byte[] data;
+ final int hashCode;
+
+ public ImmutableByteArrayWrapper(byte[] data) {
+ this.data = data;
+ hashCode = Fields.hashCode(data);
+ }
+
+ public boolean equals(Object o) {
+ if(o instanceof ImmutableByteArrayWrapper) {
+ ImmutableByteArrayWrapper w =
(ImmutableByteArrayWrapper) o;
+ if(w.hashCode == hashCode &&
+ Arrays.equals(data, w.data))
+ return true;
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return hashCode;
+ }
+}
\ No newline at end of file