Author: toad
Date: 2006-05-16 19:52:59 +0000 (Tue, 16 May 2006)
New Revision: 8715
Added:
trunk/freenet/src/freenet/node/FastRunnable.java
Modified:
trunk/freenet/src/freenet/node/ARKFetcher.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/ArrayBucket.java
Log:
710: Working (apparently) ARKs.
Modified: trunk/freenet/src/freenet/node/ARKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/node/ARKFetcher.java 2006-05-16 18:43:30 UTC
(rev 8714)
+++ trunk/freenet/src/freenet/node/ARKFetcher.java 2006-05-16 19:52:59 UTC
(rev 8715)
@@ -1,12 +1,31 @@
package freenet.node;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.InserterException;
+import freenet.client.async.BaseClientPutter;
+import freenet.client.async.ClientCallback;
+import freenet.client.async.ClientGetter;
+import freenet.keys.FreenetURI;
+import freenet.keys.USK;
+import freenet.support.ArrayBucket;
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
/**
* Fetch an ARK. Permanent, tied to a PeerNode, stops itself after a
successful fetch.
*/
-public class ARKFetcher {
+public class ARKFetcher implements ClientCallback {
final PeerNode peer;
final Node node;
+ private ClientGetter getter;
+ private FreenetURI fetchingURI;
+ private boolean shouldRun = false;
public ARKFetcher(PeerNode peer, Node node) {
this.peer = peer;
@@ -21,16 +40,91 @@
* recent).
*/
public void start() {
- // Start fetch
- // FIXME
+ ClientGetter cg = null;
+ synchronized(this) {
+ // Start fetch
+ shouldRun = true;
+ if(getter == null) {
+ USK ark = peer.getARK();
+ if(ark == null) {
+ return;
+ }
+ FreenetURI uri = ark.getURI();
+ fetchingURI = uri;
+ Logger.minor(this, "Fetching ARK: "+uri+" for
"+peer);
+ cg = new ClientGetter(this,
node.chkFetchScheduler, node.sskFetchScheduler,
+ uri, node.arkFetcherContext,
RequestStarter.INTERACTIVE_PRIORITY_CLASS,
+ this, new ArrayBucket());
+ getter = cg;
+ } else return; // already running
+ }
+ if(cg != null)
+ try {
+ cg.start();
+ } catch (FetchException e) {
+ onFailure(e, cg);
+ }
}
/**
* Called when the node connects successfully.
*/
- public void stop() {
+ public synchronized void stop() {
// Stop fetch
- // FIXME
+ Logger.minor(this, "Cancelling ARK fetch for "+peer);
+ shouldRun = false;
+ if(getter != null)
+ getter.cancel();
}
+
+ public void onSuccess(FetchResult result, ClientGetter state) {
+ Logger.minor(this, "Fetched ARK for "+peer, new
Exception("debug"));
+ // Fetcher context specifies an upper bound on size.
+ ArrayBucket bucket = (ArrayBucket) result.asBucket();
+ byte[] data = bucket.toByteArray();
+ String ref;
+ try {
+ ref = new String(data, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Yeah, right.
+ throw new Error(e);
+ }
+ SimpleFieldSet fs;
+ try {
+ fs = new SimpleFieldSet(ref, true);
+ peer.gotARK(fs);
+ } catch (IOException e) {
+ // Corrupt ref.
+ Logger.error(this, "Corrupt ARK reference? Fetched
"+fetchingURI+" got while parsing: "+e+" from:\n"+ref, e);
+ }
+ }
+
+ public void onFailure(FetchException e, ClientGetter state) {
+ Logger.minor(this, "Failed to fetch ARK for "+peer+" : "+e, e);
+ // If it's a redirect, follow the redirect and update the ARK.
+ // If it's any other error, wait a while then retry.
+ getter = null;
+ if(!shouldRun) return;
+ if(e.newURI != null) {
+ peer.updateARK(e.newURI);
+ start();
+ return;
+ }
+ // We may be on the PacketSender thread.
+ // FIXME should this be exponential backoff?
+ node.ps.queueTimedJob(new FastRunnable() { public void run() {
start(); }}, 20000L);
+ }
+
+ public void onSuccess(BaseClientPutter state) {
+ // Impossible.
+ }
+
+ public void onFailure(InserterException e, BaseClientPutter state) {
+ // Impossible.
+ }
+
+ public void onGeneratedURI(FreenetURI uri, BaseClientPutter state) {
+ // Impossible.
+ }
}
Added: trunk/freenet/src/freenet/node/FastRunnable.java
===================================================================
--- trunk/freenet/src/freenet/node/FastRunnable.java 2006-05-16 18:43:30 UTC
(rev 8714)
+++ trunk/freenet/src/freenet/node/FastRunnable.java 2006-05-16 19:52:59 UTC
(rev 8715)
@@ -0,0 +1,8 @@
+package freenet.node;
+
+/**
+ * Runnable which can be executed in-line on the PacketSender.
+ */
+public interface FastRunnable extends Runnable {
+
+}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-05-16 18:43:30 UTC (rev
8714)
+++ trunk/freenet/src/freenet/node/Node.java 2006-05-16 19:52:59 UTC (rev
8715)
@@ -32,6 +32,7 @@
import freenet.client.ClientMetadata;
import freenet.client.FetchException;
import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
import freenet.client.InserterException;
@@ -160,7 +161,9 @@
SimpleFieldSet fs = exportPublicFieldSet();
// Remove some unnecessary fields that only cause
collisions.
- fs.remove("ark.number");
+
+ // Delete entire ark.* field for now. Changing this and
automatically moving to the new may be supported in future.
+ fs.remove("ark");
fs.remove("location");
//fs.remove("version"); - keep version because of its
significance in reconnection
@@ -430,6 +433,8 @@
private InsertableClientSSK myARK;
/** My ARK sequence number */
private long myARKNumber;
+ /** FetcherContext for ARKs */
+ public final FetcherContext arkFetcherContext;
private final HashSet runningUIDs;
@@ -1222,6 +1227,19 @@
// And finally, Initialize the plugin manager
pluginManager = new PluginManager(this);
+
+ FetcherContext ctx = makeClient((short)0).getFetcherContext();
+
+ ctx.allowSplitfiles = false;
+ ctx.dontEnterImplicitArchives = true;
+ ctx.maxArchiveRestarts = 0;
+ ctx.maxMetadataSize = 256;
+ ctx.maxNonSplitfileRetries = 10;
+ ctx.maxOutputLength = 4096;
+ ctx.maxRecursionLevel = 2;
+ ctx.maxTempLength = 4096;
+
+ this.arkFetcherContext = ctx;
}
private InetAddress resolve(String val) {
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2006-05-16 18:43:30 UTC
(rev 8714)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2006-05-16 19:52:59 UTC
(rev 8715)
@@ -1,6 +1,8 @@
package freenet.node;
import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Vector;
import freenet.io.comm.DMT;
import freenet.io.comm.Message;
@@ -18,6 +20,8 @@
public class PacketSender implements Runnable {
final LinkedList resendPackets;
+ /** ~= Ticker :) */
+ private final TreeMap timedJobsByTime;
final Thread myThread;
final Node node;
long lastClearedOldSwapChains;
@@ -26,6 +30,7 @@
PacketSender(Node node) {
resendPackets = new LinkedList();
+ timedJobsByTime = new TreeMap();
this.node = node;
myThread = new Thread(this, "PacketSender thread for
"+node.portNumber);
myThread.setDaemon(true);
@@ -142,8 +147,7 @@
} else {
for(int j=0;j<messages.length;j++) {
Logger.minor(this, "PS Sending:
"+(messages[j].msg == null ? "(not a Message)" :
messages[j].msg.getSpec().getName()));
- if (messages[j].msg != null)
- {
+ if (messages[j].msg != null) {
pn.addToLocalNodeSentMessagesToStatistic(messages[j].msg);
}
}
@@ -173,8 +177,48 @@
node.lm.clearOldSwapChains();
lastClearedOldSwapChains = now;
}
+
// Send may have taken some time
now = System.currentTimeMillis();
+
+ Vector jobsToRun = null;
+
+ synchronized(timedJobsByTime) {
+ while(!timedJobsByTime.isEmpty()) {
+ Long tRun = (Long) timedJobsByTime.firstKey();
+ if(tRun.longValue() <= now) {
+ if(jobsToRun == null) jobsToRun = new
Vector();
+ Object o = timedJobsByTime.remove(tRun);
+ if(o instanceof Runnable[]) {
+ Runnable[] r = (Runnable[]) o;
+ for(int i=0;i<r.length;i++)
+ jobsToRun.add(r[i]);
+ } else {
+ Runnable r = (Runnable) o;
+ jobsToRun.add(r);
+ }
+ } else break;
+ }
+ }
+
+ if(jobsToRun != null)
+ for(int i=0;i<jobsToRun.size();i++) {
+ Runnable r = (Runnable) jobsToRun.get(i);
+ Logger.minor(this, "Running "+r);
+ if(r instanceof FastRunnable) {
+ // Run in-line
+ try {
+ r.run();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+"
running "+r, t);
+ }
+ } else {
+ Thread t = new Thread(r, "Scheduled job: "+r);
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
long sleepTime = nextActionTime - now;
// 200ms maximum sleep time
sleepTime = Math.min(sleepTime, 200);
@@ -204,4 +248,23 @@
notifyAll();
}
}
+
+ public void queueTimedJob(Runnable job, long offset) {
+ long now = System.currentTimeMillis();
+ Long l = new Long(offset + now);
+ synchronized(timedJobsByTime) {
+ Object o = timedJobsByTime.get(l);
+ if(o == null) {
+ timedJobsByTime.put(l, job);
+ } else if(o instanceof Runnable) {
+ timedJobsByTime.put(l, new Runnable[] {
(Runnable)o, job });
+ } else if(o instanceof Runnable[]) {
+ Runnable[] r = (Runnable[]) o;
+ Runnable[] jobs = new Runnable[r.length+1];
+ System.arraycopy(r, 0, jobs, 0, r.length);
+ jobs[jobs.length-1] = job;
+ timedJobsByTime.put(l, jobs);
+ }
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2006-05-16 18:43:30 UTC
(rev 8714)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2006-05-16 19:52:59 UTC
(rev 8715)
@@ -113,9 +113,12 @@
/** My ARK SSK public key */
private USK myARK;
- /** My ARK sequence number */
- private long myARKNumber;
+ /** Number of handshake attempts since last successful connection or ARK
fetch */
+ private int handshakeCount = 0;
+ /** After this many failed handshakes, we start the ARK fetcher. */
+ private static final int MAX_HANDSHAKE_COUNT = 2;
+
/** Current location in the keyspace */
private Location currentLocation;
@@ -348,26 +351,27 @@
// It belongs to this node, not to the node being described.
// Therefore, if we are parsing a remotely supplied ref, ignore it.
- if(!fromLocal) return;
+ if(fromLocal) {
- SimpleFieldSet metadata = fs.subset("metadata");
-
- if(metadata != null) {
-
- // Don't be tolerant of nonexistant domains; this should be an
IP address.
- Peer p;
- try {
- p = new Peer(metadata.get("detected.udp"),
false);
- } catch (UnknownHostException e) {
- p = null;
- Logger.error(this, "detected.udp =
"+metadata.get("detected.udp")+" - "+e, e);
- } catch (PeerParseException e) {
- p = null;
- Logger.error(this, "detected.udp =
"+metadata.get("detected.udp")+" - "+e, e);
- }
- if(p != null)
- detectedPeer = p;
+ SimpleFieldSet metadata = fs.subset("metadata");
+ if(metadata != null) {
+
+ // Don't be tolerant of nonexistant domains; this
should be an IP address.
+ Peer p;
+ try {
+ p = new Peer(metadata.get("detected.udp"),
false);
+ } catch (UnknownHostException e) {
+ p = null;
+ Logger.error(this, "detected.udp =
"+metadata.get("detected.udp")+" - "+e, e);
+ } catch (PeerParseException e) {
+ p = null;
+ Logger.error(this, "detected.udp =
"+metadata.get("detected.udp")+" - "+e, e);
+ }
+ if(p != null)
+ detectedPeer = p;
+ }
+
}
}
@@ -386,7 +390,7 @@
if(arkPubKey != null) {
FreenetURI uri = new FreenetURI(arkPubKey);
ClientSSK ssk = new ClientSSK(uri);
- ark = new USK(ssk, myARKNumber);
+ ark = new USK(ssk, arkNo);
}
} catch (MalformedURLException e) {
Logger.error(this, "Couldn't parse ARK info for "+this+": "+e,
e);
@@ -395,8 +399,7 @@
}
if(ark != null) {
- if(myARKNumber != arkNo || myARK != ark) {
- myARKNumber = arkNo;
+ if(myARK == null || (myARK != ark && !myARK.equals(ark))) {
myARK = ark;
return true;
}
@@ -612,6 +615,10 @@
+
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS);
}
firstHandshake = false;
+ this.handshakeCount++;
+ if(handshakeCount == MAX_HANDSHAKE_COUNT) {
+ arkFetcher.start();
+ }
}
/**
@@ -630,6 +637,10 @@
+
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS)
+
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS);
}
+ this.handshakeCount++;
+ if(handshakeCount == MAX_HANDSHAKE_COUNT) {
+ arkFetcher.start();
+ }
}
/**
@@ -809,6 +820,8 @@
*/
public synchronized boolean completedHandshake(long thisBootID, byte[]
data, int offset, int length, BlockCipher encCipher, byte[] encKey, Peer
replyTo, boolean unverified) {
completedHandshake = true;
+ handshakeCount = 0;
+ arkFetcher.stop();
bogusNoderef = false;
try {
// First, the new noderef
@@ -981,13 +994,13 @@
Logger.error(this, "Impossible: e", e);
return;
}
- processNewNoderef(fs);
+ processNewNoderef(fs, false);
}
/**
* Process a new nodereference, as a SimpleFieldSet.
*/
- private void processNewNoderef(SimpleFieldSet fs) throws FSParseException {
+ private void processNewNoderef(SimpleFieldSet fs, boolean forARK) throws
FSParseException {
Logger.minor(this, "Parsing: "+fs);
boolean changedAnything = false;
String identityString = fs.get("identity");
@@ -1003,16 +1016,26 @@
throw new FSParseException(e);
}
String newVersion = fs.get("version");
- if(newVersion == null) throw new FSParseException("No version");
- if(!newVersion.equals(version))
- changedAnything = true;
- version = newVersion;
- Version.seenVersion(newVersion);
+ if(newVersion == null) {
+ // Version may be ommitted for an ARK.
+ if(!forARK)
+ throw new FSParseException("No version");
+ } else {
+ if(!newVersion.equals(version))
+ changedAnything = true;
+ version = newVersion;
+ Version.seenVersion(newVersion);
+ }
String locationString = fs.get("location");
- if(locationString == null) throw new FSParseException("No location");
- Location loc = new Location(locationString);
- if(!loc.equals(currentLocation)) changedAnything = true;
- currentLocation = loc;
+ if(locationString == null) {
+ // Location WILL be ommitted for an ARK.
+ if(!forARK)
+ throw new FSParseException("No location");
+ } else {
+ Location loc = new Location(locationString);
+ if(!loc.equals(currentLocation)) changedAnything = true;
+ currentLocation = loc;
+ }
if(nominalPeer==null)
nominalPeer=new Vector();
@@ -1050,6 +1073,8 @@
}
String name = fs.get("myName");
if(name == null) throw new FSParseException("No name");
+ // In future, ARKs may support automatic transition when the ARK key
is changed.
+ // So parse it anyway. If it fails, no big loss; it won't even log an
error.
if(parseARK(fs))
changedAnything = true;
if(!name.equals(myName)) changedAnything = true;
@@ -1155,7 +1180,7 @@
fs.put("version", version);
fs.put("myName", myName);
if(myARK != null) {
- fs.put("ark.number", Long.toString(this.myARKNumber));
+ fs.put("ark.number", Long.toString(myARK.suggestedEdition));
fs.put("ark.pubURI", myARK.getBaseSSK().toString(false));
}
return fs;
@@ -1417,5 +1442,34 @@
{
return localNodeReceivedMessageTypes;
}
+
+ USK getARK() {
+ return myARK;
+ }
+
+ public void updateARK(FreenetURI newURI) {
+ try {
+ USK usk = USK.create(newURI);
+ if(!myARK.equals(usk, false)) {
+ Logger.error(this, "Changing ARK not supported
(and shouldn't be possible): from "+myARK+" to "+usk);
+ } else if(myARK.suggestedEdition >
usk.suggestedEdition) {
+ Logger.minor(this, "Ignoring ARK edition
decrease: "+myARK.suggestedEdition+" to "+usk.suggestedEdition);
+ } else
+ myARK = usk;
+ } catch (MalformedURLException e) {
+ Logger.error(this, "ARK update failed: Could not parse
permanent redirect (from USK): "+newURI+" : "+e, e);
+ }
+ }
+
+ public void gotARK(SimpleFieldSet fs) {
+ try {
+ processNewNoderef(fs, true);
+ this.handshakeCount = 0;
+ } catch (FSParseException e) {
+ Logger.error(this, "Invalid ARK update: "+e, e);
+ // This is ok as ARKs are limited to 4K anyway.
+ Logger.error(this, "Data was: \n"+fs.toString());
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-05-16 18:43:30 UTC (rev
8714)
+++ trunk/freenet/src/freenet/node/Version.java 2006-05-16 19:52:59 UTC (rev
8715)
@@ -18,7 +18,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 709;
+ private static final int buildNumber = 710;
/** Oldest build of Fred we will talk to */
private static final int lastGoodBuild = 591;
Modified: trunk/freenet/src/freenet/support/ArrayBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/ArrayBucket.java 2006-05-16 18:43:30 UTC
(rev 8714)
+++ trunk/freenet/src/freenet/support/ArrayBucket.java 2006-05-16 19:52:59 UTC
(rev 8715)
@@ -11,6 +11,8 @@
/**
* A bucket that stores data in the memory.
*
+ * FIXME: No synchronization, should there be?
+ *
* @author oskar
*/
public class ArrayBucket implements Bucket {
@@ -167,4 +169,19 @@
data.clear();
// Not much else we can do.
}
+
+ public byte[] toByteArray() {
+ long sz = size();
+ int size = (int)sz;
+ byte[] buf = new byte[size];
+ int index = 0;
+ for(Iterator i=data.iterator();i.hasNext();) {
+ byte[] obuf = (byte[]) i.next();
+ System.arraycopy(obuf, 0, buf, index, obuf.length);
+ index += obuf.length;
+ }
+ if(index != buf.length)
+ throw new IllegalStateException();
+ return buf;
+ }
}