Author: toad
Date: 2006-05-16 19:52:59 +0000 (Tue, 16 May 2006)
New Revision: 8715

Added:
   trunk/freenet/src/freenet/node/FastRunnable.java
Modified:
   trunk/freenet/src/freenet/node/ARKFetcher.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/ArrayBucket.java
Log:
710: Working (apparently) ARKs.

Modified: trunk/freenet/src/freenet/node/ARKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/node/ARKFetcher.java      2006-05-16 18:43:30 UTC 
(rev 8714)
+++ trunk/freenet/src/freenet/node/ARKFetcher.java      2006-05-16 19:52:59 UTC 
(rev 8715)
@@ -1,12 +1,31 @@
 package freenet.node;

+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.InserterException;
+import freenet.client.async.BaseClientPutter;
+import freenet.client.async.ClientCallback;
+import freenet.client.async.ClientGetter;
+import freenet.keys.FreenetURI;
+import freenet.keys.USK;
+import freenet.support.ArrayBucket;
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
 /**
  * Fetch an ARK. Permanent, tied to a PeerNode, stops itself after a 
successful fetch.
  */
-public class ARKFetcher {
+public class ARKFetcher implements ClientCallback {

        final PeerNode peer;
        final Node node;
+       private ClientGetter getter;
+       private FreenetURI fetchingURI;
+       private boolean shouldRun = false;

        public ARKFetcher(PeerNode peer, Node node) {
                this.peer = peer;
@@ -21,16 +40,91 @@
         * recent).
         */
        public void start() {
-               // Start fetch
-               // FIXME
+               ClientGetter cg = null;
+               synchronized(this) {
+                       // Start fetch
+                       shouldRun = true;
+                       if(getter == null) {
+                               USK ark = peer.getARK();
+                               if(ark == null) {
+                                       return;
+                               }
+                               FreenetURI uri = ark.getURI();
+                               fetchingURI = uri;
+                               Logger.minor(this, "Fetching ARK: "+uri+" for 
"+peer);
+                               cg = new ClientGetter(this, 
node.chkFetchScheduler, node.sskFetchScheduler, 
+                                               uri, node.arkFetcherContext, 
RequestStarter.INTERACTIVE_PRIORITY_CLASS, 
+                                               this, new ArrayBucket());
+                               getter = cg;
+                       } else return; // already running
+               }
+               if(cg != null)
+                       try {
+                               cg.start();
+                       } catch (FetchException e) {
+                               onFailure(e, cg);
+                       }
        }

        /**
         * Called when the node connects successfully.
         */
-       public void stop() {
+       public synchronized void stop() {
                // Stop fetch
-               // FIXME
+               Logger.minor(this, "Cancelling ARK fetch for "+peer);
+               shouldRun = false;
+               if(getter != null)
+                       getter.cancel();
        }
+
+       public void onSuccess(FetchResult result, ClientGetter state) {
+               Logger.minor(this, "Fetched ARK for "+peer, new 
Exception("debug"));
+               // Fetcher context specifies an upper bound on size.
+               ArrayBucket bucket = (ArrayBucket) result.asBucket();
+               byte[] data = bucket.toByteArray();
+               String ref;
+               try {
+                       ref = new String(data, "UTF-8");
+               } catch (UnsupportedEncodingException e) {
+                       // Yeah, right.
+                       throw new Error(e);
+               }
+               SimpleFieldSet fs;
+               try {
+                       fs = new SimpleFieldSet(ref, true);
+                       peer.gotARK(fs);
+               } catch (IOException e) {
+                       // Corrupt ref.
+                       Logger.error(this, "Corrupt ARK reference? Fetched 
"+fetchingURI+" got while parsing: "+e+" from:\n"+ref, e);
+               }
+       }
+
+       public void onFailure(FetchException e, ClientGetter state) {
+               Logger.minor(this, "Failed to fetch ARK for "+peer+" : "+e, e);
+               // If it's a redirect, follow the redirect and update the ARK.
+               // If it's any other error, wait a while then retry.
+               getter = null;
+               if(!shouldRun) return;
+               if(e.newURI != null) {
+                       peer.updateARK(e.newURI);
+                       start();
+                       return;
+               }
+               // We may be on the PacketSender thread.
+               // FIXME should this be exponential backoff?
+               node.ps.queueTimedJob(new FastRunnable() { public void run() { 
start(); }}, 20000L);
+       }
+
+       public void onSuccess(BaseClientPutter state) {
+               // Impossible.
+       }
+
+       public void onFailure(InserterException e, BaseClientPutter state) {
+               // Impossible.
+       }
+
+       public void onGeneratedURI(FreenetURI uri, BaseClientPutter state) {
+               // Impossible.
+       }

 }

Added: trunk/freenet/src/freenet/node/FastRunnable.java
===================================================================
--- trunk/freenet/src/freenet/node/FastRunnable.java    2006-05-16 18:43:30 UTC 
(rev 8714)
+++ trunk/freenet/src/freenet/node/FastRunnable.java    2006-05-16 19:52:59 UTC 
(rev 8715)
@@ -0,0 +1,8 @@
+package freenet.node;
+
+/**
+ * Runnable which can be executed in-line on the PacketSender.
+ */
+public interface FastRunnable extends Runnable {
+
+}

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-05-16 18:43:30 UTC (rev 
8714)
+++ trunk/freenet/src/freenet/node/Node.java    2006-05-16 19:52:59 UTC (rev 
8715)
@@ -32,6 +32,7 @@
 import freenet.client.ClientMetadata;
 import freenet.client.FetchException;
 import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
 import freenet.client.HighLevelSimpleClient;
 import freenet.client.HighLevelSimpleClientImpl;
 import freenet.client.InserterException;
@@ -160,7 +161,9 @@
                        SimpleFieldSet fs = exportPublicFieldSet();

                        // Remove some unnecessary fields that only cause 
collisions.
-                       fs.remove("ark.number");
+                       
+                       // Delete entire ark.* field for now. Changing this and 
automatically moving to the new may be supported in future.
+                       fs.remove("ark");
                        fs.remove("location");
                        //fs.remove("version"); - keep version because of its 
significance in reconnection

@@ -430,6 +433,8 @@
     private InsertableClientSSK myARK;
     /** My ARK sequence number */
     private long myARKNumber;
+    /** FetcherContext for ARKs */
+       public final FetcherContext arkFetcherContext;

     private final HashSet runningUIDs;

@@ -1222,6 +1227,19 @@

                // And finally, Initialize the plugin manager
                pluginManager = new PluginManager(this);
+               
+               FetcherContext ctx = makeClient((short)0).getFetcherContext();
+               
+               ctx.allowSplitfiles = false;
+               ctx.dontEnterImplicitArchives = true;
+               ctx.maxArchiveRestarts = 0;
+               ctx.maxMetadataSize = 256;
+               ctx.maxNonSplitfileRetries = 10;
+               ctx.maxOutputLength = 4096;
+               ctx.maxRecursionLevel = 2;
+               ctx.maxTempLength = 4096;
+               
+               this.arkFetcherContext = ctx;
     }

        private InetAddress resolve(String val) {

Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2006-05-16 18:43:30 UTC 
(rev 8714)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2006-05-16 19:52:59 UTC 
(rev 8715)
@@ -1,6 +1,8 @@
 package freenet.node;

 import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Vector;

 import freenet.io.comm.DMT;
 import freenet.io.comm.Message;
@@ -18,6 +20,8 @@
 public class PacketSender implements Runnable {

     final LinkedList resendPackets;
+    /** ~= Ticker :) */
+    private final TreeMap timedJobsByTime;
     final Thread myThread;
     final Node node;
     long lastClearedOldSwapChains;
@@ -26,6 +30,7 @@

     PacketSender(Node node) {
         resendPackets = new LinkedList();
+        timedJobsByTime = new TreeMap();
         this.node = node;
         myThread = new Thread(this, "PacketSender thread for 
"+node.portNumber);
         myThread.setDaemon(true);
@@ -142,8 +147,7 @@
                        } else {
                                for(int j=0;j<messages.length;j++) {
                                        Logger.minor(this, "PS Sending: 
"+(messages[j].msg == null ? "(not a Message)" : 
messages[j].msg.getSpec().getName()));
-                                       if (messages[j].msg != null)
-                                       {
+                                       if (messages[j].msg != null) {
                                                
pn.addToLocalNodeSentMessagesToStatistic(messages[j].msg);
                                        }
                                }
@@ -173,8 +177,48 @@
             node.lm.clearOldSwapChains();
             lastClearedOldSwapChains = now;
         }
+        
         // Send may have taken some time
         now = System.currentTimeMillis();
+        
+        Vector jobsToRun = null;
+        
+        synchronized(timedJobsByTime) {
+               while(!timedJobsByTime.isEmpty()) {
+                               Long tRun = (Long) timedJobsByTime.firstKey();
+                               if(tRun.longValue() <= now) {
+                                       if(jobsToRun == null) jobsToRun = new 
Vector();
+                                       Object o = timedJobsByTime.remove(tRun);
+                                       if(o instanceof Runnable[]) {
+                                               Runnable[] r = (Runnable[]) o;
+                                               for(int i=0;i<r.length;i++)
+                                                       jobsToRun.add(r[i]);
+                                       } else {
+                                               Runnable r = (Runnable) o;
+                                               jobsToRun.add(r);
+                                       }
+                               } else break;
+               }
+        }
+
+        if(jobsToRun != null)
+               for(int i=0;i<jobsToRun.size();i++) {
+                       Runnable r = (Runnable) jobsToRun.get(i);
+                       Logger.minor(this, "Running "+r);
+                       if(r instanceof FastRunnable) {
+                               // Run in-line
+                               try {
+                                       r.run();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught "+t+" 
running "+r, t);
+                               }
+                       } else {
+                               Thread t = new Thread(r, "Scheduled job: "+r);
+                               t.setDaemon(true);
+                               t.start();
+                       }
+               }
+        
         long sleepTime = nextActionTime - now;
         // 200ms maximum sleep time
         sleepTime = Math.min(sleepTime, 200);
@@ -204,4 +248,23 @@
             notifyAll();
         }
     }
+
+       public void queueTimedJob(Runnable job, long offset) {
+               long now = System.currentTimeMillis();
+               Long l = new Long(offset + now);
+               synchronized(timedJobsByTime) {
+                       Object o = timedJobsByTime.get(l);
+                       if(o == null) {
+                               timedJobsByTime.put(l, job);
+                       } else if(o instanceof Runnable) {
+                               timedJobsByTime.put(l, new Runnable[] { 
(Runnable)o, job });
+                       } else if(o instanceof Runnable[]) {
+                               Runnable[] r = (Runnable[]) o;
+                               Runnable[] jobs = new Runnable[r.length+1];
+                               System.arraycopy(r, 0, jobs, 0, r.length);
+                               jobs[jobs.length-1] = job;
+                               timedJobsByTime.put(l, jobs);
+                       }
+               }
+       }
 }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2006-05-16 18:43:30 UTC 
(rev 8714)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2006-05-16 19:52:59 UTC 
(rev 8715)
@@ -113,9 +113,12 @@
     /** My ARK SSK public key */
     private USK myARK;

-    /** My ARK sequence number */
-    private long myARKNumber;
+    /** Number of handshake attempts since last successful connection or ARK 
fetch */
+    private int handshakeCount = 0;

+    /** After this many failed handshakes, we start the ARK fetcher. */
+    private static final int MAX_HANDSHAKE_COUNT = 2;
+    
     /** Current location in the keyspace */
     private Location currentLocation;

@@ -348,26 +351,27 @@
         // It belongs to this node, not to the node being described.
         // Therefore, if we are parsing a remotely supplied ref, ignore it.

-        if(!fromLocal) return;
+        if(fromLocal) {

-        SimpleFieldSet metadata = fs.subset("metadata");
-        
-        if(metadata != null) {
-        
-               // Don't be tolerant of nonexistant domains; this should be an 
IP address.
-               Peer p;
-                       try {
-                               p = new Peer(metadata.get("detected.udp"), 
false);
-                       } catch (UnknownHostException e) {
-                               p = null;
-                               Logger.error(this, "detected.udp = 
"+metadata.get("detected.udp")+" - "+e, e);
-                       } catch (PeerParseException e) {
-                               p = null;
-                               Logger.error(this, "detected.udp = 
"+metadata.get("detected.udp")+" - "+e, e);
-                       }
-               if(p != null)
-                       detectedPeer = p;
+               SimpleFieldSet metadata = fs.subset("metadata");

+               if(metadata != null) {
+                       
+                       // Don't be tolerant of nonexistant domains; this 
should be an IP address.
+                       Peer p;
+                       try {
+                               p = new Peer(metadata.get("detected.udp"), 
false);
+                       } catch (UnknownHostException e) {
+                               p = null;
+                               Logger.error(this, "detected.udp = 
"+metadata.get("detected.udp")+" - "+e, e);
+                       } catch (PeerParseException e) {
+                               p = null;
+                               Logger.error(this, "detected.udp = 
"+metadata.get("detected.udp")+" - "+e, e);
+                       }
+                       if(p != null)
+                               detectedPeer = p;
+               }
+               
         }

     }
@@ -386,7 +390,7 @@
                if(arkPubKey != null) {
                        FreenetURI uri = new FreenetURI(arkPubKey);
                        ClientSSK ssk = new ClientSSK(uri);
-                       ark = new USK(ssk, myARKNumber);
+                       ark = new USK(ssk, arkNo);
                }
         } catch (MalformedURLException e) {
                Logger.error(this, "Couldn't parse ARK info for "+this+": "+e, 
e);
@@ -395,8 +399,7 @@
         }

         if(ark != null) {
-               if(myARKNumber != arkNo || myARK != ark) {
-                       myARKNumber = arkNo;
+               if(myARK == null || (myARK != ark && !myARK.equals(ark))) {
                        myARK = ark;
                        return true;
                }
@@ -612,6 +615,10 @@
                        + 
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS);
         }
         firstHandshake = false;
+        this.handshakeCount++;
+        if(handshakeCount == MAX_HANDSHAKE_COUNT) {
+               arkFetcher.start();
+        }
     }

     /**
@@ -630,6 +637,10 @@
               + 
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS)
               + 
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS);
         }
+        this.handshakeCount++;
+        if(handshakeCount == MAX_HANDSHAKE_COUNT) {
+               arkFetcher.start();
+        }
     }

     /**
@@ -809,6 +820,8 @@
      */
     public synchronized boolean completedHandshake(long thisBootID, byte[] 
data, int offset, int length, BlockCipher encCipher, byte[] encKey, Peer 
replyTo, boolean unverified) {
        completedHandshake = true;
+       handshakeCount = 0;
+       arkFetcher.stop();
         bogusNoderef = false;
         try {
             // First, the new noderef
@@ -981,13 +994,13 @@
             Logger.error(this, "Impossible: e", e);
             return;
         }
-        processNewNoderef(fs);
+        processNewNoderef(fs, false);
     }

     /**
      * Process a new nodereference, as a SimpleFieldSet.
      */
-    private void processNewNoderef(SimpleFieldSet fs) throws FSParseException {
+    private void processNewNoderef(SimpleFieldSet fs, boolean forARK) throws 
FSParseException {
         Logger.minor(this, "Parsing: "+fs);
         boolean changedAnything = false;
         String identityString = fs.get("identity");
@@ -1003,16 +1016,26 @@
             throw new FSParseException(e);
                }
         String newVersion = fs.get("version");
-        if(newVersion == null) throw new FSParseException("No version");
-        if(!newVersion.equals(version))
-            changedAnything = true;
-        version = newVersion;
-        Version.seenVersion(newVersion);
+        if(newVersion == null) {
+               // Version may be ommitted for an ARK.
+               if(!forARK)
+                       throw new FSParseException("No version");
+        } else {
+               if(!newVersion.equals(version))
+                       changedAnything = true;
+               version = newVersion;
+               Version.seenVersion(newVersion);
+        }
         String locationString = fs.get("location");
-        if(locationString == null) throw new FSParseException("No location");
-        Location loc = new Location(locationString);
-        if(!loc.equals(currentLocation)) changedAnything = true;
-        currentLocation = loc;
+        if(locationString == null) {
+               // Location WILL be ommitted for an ARK.
+               if(!forARK)
+                       throw new FSParseException("No location");
+        } else {
+               Location loc = new Location(locationString);
+               if(!loc.equals(currentLocation)) changedAnything = true;
+               currentLocation = loc;
+        }

         if(nominalPeer==null)
                nominalPeer=new Vector();
@@ -1050,6 +1073,8 @@
         }
         String name = fs.get("myName");
         if(name == null) throw new FSParseException("No name");
+        // In future, ARKs may support automatic transition when the ARK key 
is changed.
+        // So parse it anyway. If it fails, no big loss; it won't even log an 
error.
         if(parseARK(fs))
                changedAnything = true;
         if(!name.equals(myName)) changedAnything = true;
@@ -1155,7 +1180,7 @@
         fs.put("version", version);
         fs.put("myName", myName);
         if(myARK != null) {
-               fs.put("ark.number", Long.toString(this.myARKNumber));
+               fs.put("ark.number", Long.toString(myARK.suggestedEdition));
                fs.put("ark.pubURI", myARK.getBaseSSK().toString(false));
         }
         return fs;
@@ -1417,5 +1442,34 @@
        {
                return localNodeReceivedMessageTypes;
        }
+
+       USK getARK() {
+               return myARK;
+       }
+
+       public void updateARK(FreenetURI newURI) {
+               try {
+                       USK usk = USK.create(newURI);
+                       if(!myARK.equals(usk, false)) {
+                               Logger.error(this, "Changing ARK not supported 
(and shouldn't be possible): from "+myARK+" to "+usk);
+                       } else if(myARK.suggestedEdition > 
usk.suggestedEdition) {
+                               Logger.minor(this, "Ignoring ARK edition 
decrease: "+myARK.suggestedEdition+" to "+usk.suggestedEdition);
+                       } else
+                               myARK = usk;
+               } catch (MalformedURLException e) {
+                       Logger.error(this, "ARK update failed: Could not parse 
permanent redirect (from USK): "+newURI+" : "+e, e);
+               }
+       }
+
+       public void gotARK(SimpleFieldSet fs) {
+               try {
+                       processNewNoderef(fs, true);
+                       this.handshakeCount = 0;
+               } catch (FSParseException e) {
+                       Logger.error(this, "Invalid ARK update: "+e, e);
+                       // This is ok as ARKs are limited to 4K anyway.
+                       Logger.error(this, "Data was: \n"+fs.toString());
+               }
+       }
 }


Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-05-16 18:43:30 UTC (rev 
8714)
+++ trunk/freenet/src/freenet/node/Version.java 2006-05-16 19:52:59 UTC (rev 
8715)
@@ -18,7 +18,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 709;
+       private static final int buildNumber = 710;

        /** Oldest build of Fred we will talk to */
        private static final int lastGoodBuild = 591;

Modified: trunk/freenet/src/freenet/support/ArrayBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/ArrayBucket.java  2006-05-16 18:43:30 UTC 
(rev 8714)
+++ trunk/freenet/src/freenet/support/ArrayBucket.java  2006-05-16 19:52:59 UTC 
(rev 8715)
@@ -11,6 +11,8 @@
 /**
  * A bucket that stores data in the memory.
  * 
+ * FIXME: No synchronization, should there be?
+ * 
  * @author oskar
  */
 public class ArrayBucket implements Bucket {
@@ -167,4 +169,19 @@
                data.clear();
                // Not much else we can do.
        }
+
+       public byte[] toByteArray() {
+               long sz = size();
+               int size = (int)sz;
+               byte[] buf = new byte[size];
+               int index = 0;
+               for(Iterator i=data.iterator();i.hasNext();) {
+                       byte[] obuf = (byte[]) i.next();
+                       System.arraycopy(obuf, 0, buf, index, obuf.length);
+                       index += obuf.length;
+               }
+               if(index != buf.length)
+                       throw new IllegalStateException();
+               return buf;
+       }
 }


Reply via email to