Author: toad
Date: 2006-03-02 16:13:21 +0000 (Thu, 02 Mar 2006)
New Revision: 8138

Modified:
   trunk/freenet/src/freenet/client/async/ClientRequest.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/node/fcp/ClientGet.java
   trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
   trunk/freenet/src/freenet/node/fcp/ClientPut.java
   trunk/freenet/src/freenet/node/fcp/ClientRequest.java
   trunk/freenet/src/freenet/node/fcp/FCPClient.java
   trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
   trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
   trunk/freenet/src/freenet/node/fcp/FCPServer.java
   trunk/freenet/src/freenet/support/LRUQueue.java
   trunk/freenet/src/freenet/support/SimpleFieldSet.java
Log:
479:
Persistence=forever support.
Very basic; will restart all requests from scratch on startup!
Next step is to remember whether the request is finished, and any messages that 
need to be sent.


Modified: trunk/freenet/src/freenet/client/async/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequest.java   2006-02-27 
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/client/async/ClientRequest.java   2006-03-02 
16:13:21 UTC (rev 8138)
@@ -56,6 +56,7 @@
                        if(blockSetFinalized) return;
                        blockSetFinalized = true;
                }
+               Logger.minor(this, "Finalized set of blocks for "+this, new 
Exception("debug"));
                notifyClients();
        }


Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-02-27 23:25:20 UTC (rev 
8137)
+++ trunk/freenet/src/freenet/node/Node.java    2006-03-02 16:13:21 UTC (rev 
8138)
@@ -889,6 +889,8 @@
                        fcpServer = FCPServer.maybeCreate(this, config);
                } catch (IOException e) {
                        throw new NodeInitException(EXIT_COULD_NOT_START_FCP, 
"Could not start FCP: "+e);
+               } catch (InvalidConfigValueException e) {
+                       throw new NodeInitException(EXIT_COULD_NOT_START_FCP, 
"Could not start FCP: "+e);
                }

         // SNMP

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-02-27 23:25:20 UTC (rev 
8137)
+++ trunk/freenet/src/freenet/node/Version.java 2006-03-02 16:13:21 UTC (rev 
8138)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 478;
+       private static final int buildNumber = 479;

        /** Oldest build of Fred we will talk to */
        private static final int lastGoodBuild = 475;

Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java   2006-02-27 23:25:20 UTC 
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java   2006-03-02 16:13:21 UTC 
(rev 8138)
@@ -1,9 +1,11 @@
 package freenet.node.fcp;

+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.MalformedURLException;

 import freenet.client.FetchException;
 import freenet.client.FetchResult;
@@ -18,7 +20,9 @@
 import freenet.keys.FreenetURI;
 import freenet.support.Bucket;
 import freenet.support.BucketTools;
+import freenet.support.Fields;
 import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;

 /**
  * A simple client fetch. This can of course fetch arbitrarily large
@@ -64,13 +68,13 @@
                else
                        origHandler = null;
                this.client = handler.getClient();
-               fctx = new FetcherContext(handler.defaultFetchContext, 
FetcherContext.IDENTICAL_MASK);
+               fctx = new FetcherContext(client.defaultFetchContext, 
FetcherContext.IDENTICAL_MASK);
                fctx.eventProducer.addEventListener(this);
                // ignoreDS
                fctx.localRequestOnly = message.dsOnly;
                fctx.ignoreStore = message.ignoreDS;
                fctx.maxNonSplitfileRetries = message.maxRetries;
-               fctx.maxSplitfileBlockRetries = 
Math.max(fctx.maxSplitfileBlockRetries, message.maxRetries);
+               fctx.maxSplitfileBlockRetries = message.maxRetries;
                this.identifier = message.identifier;
                this.verbosity = message.verbosity;
                // FIXME do something with verbosity !!
@@ -80,9 +84,53 @@
                fctx.maxTempLength = message.maxTempSize;
                this.targetFile = message.diskFile;
                this.tempFile = message.tempFile;
-               getter = new ClientGetter(this, handler.node.fetchScheduler, 
uri, fctx, priorityClass, handler);
+               getter = new ClientGetter(this, client.node.fetchScheduler, 
uri, fctx, priorityClass, client);
        }

+       /**
+        * Create a ClientGet from a request serialized to a SimpleFieldSet.
+        * Can throw, and does minimal verification, as is dealing with data 
+        * supposedly serialized out by the node.
+        * @throws MalformedURLException 
+        */
+       public ClientGet(SimpleFieldSet fs, FCPClient client2) throws 
MalformedURLException {
+               uri = new FreenetURI(fs.get("URI"));
+               identifier = fs.get("Identifier");
+               verbosity = Integer.parseInt(fs.get("Verbosity"));
+               priorityClass = Short.parseShort(fs.get("PriorityClass"));
+               returnType = 
ClientGetMessage.parseValidReturnType(fs.get("ReturnType"));
+               persistenceType = 
ClientRequest.parsePersistence(fs.get("Persistence"));
+               if(persistenceType == ClientRequest.PERSIST_CONNECTION)
+                       throw new IllegalArgumentException("Reading persistent 
get with type CONNECTION !!");
+               if(!(persistenceType == ClientRequest.PERSIST_FOREVER || 
persistenceType == ClientRequest.PERSIST_REBOOT))
+                       throw new IllegalArgumentException("Unknown persistence 
type "+ClientRequest.persistenceTypeString(persistenceType));
+               this.client = client2;
+               this.origHandler = null;
+               String f = fs.get("Filename");
+               if(f != null)
+                       targetFile = new File(f);
+               else
+                       targetFile = null;
+               f = fs.get("TempFilename");
+               if(f != null)
+                       tempFile = new File(f);
+               else
+                       tempFile = null;
+               clientToken = fs.get("ClientToken");
+               finished = Boolean.parseBoolean(fs.get("Finished"));
+               boolean ignoreDS = Fields.stringToBool(fs.get("IgnoreDS"), 
false);
+               boolean dsOnly = Fields.stringToBool(fs.get("DSOnly"), false);
+               int maxRetries = Integer.parseInt(fs.get("MaxRetries"));
+               fctx = new FetcherContext(client.defaultFetchContext, 
FetcherContext.IDENTICAL_MASK);
+               fctx.eventProducer.addEventListener(this);
+               // ignoreDS
+               fctx.localRequestOnly = dsOnly;
+               fctx.ignoreStore = ignoreDS;
+               fctx.maxNonSplitfileRetries = maxRetries;
+               fctx.maxSplitfileBlockRetries = maxRetries;
+               getter = new ClientGetter(this, client.node.fetchScheduler, 
uri, fctx, priorityClass, client);
+       }
+
        void start() {
                try {
                        getter.start();
@@ -262,4 +310,51 @@
                return identifier;
        }

+       public void write(BufferedWriter w) throws IOException {
+               if(persistenceType != ClientRequest.PERSIST_REBOOT) {
+                       Logger.error(this, "Not persisting as 
persistenceType="+persistenceType);
+               }
+               // Persist the request to disk
+               SimpleFieldSet fs = getFieldSet();
+               fs.writeTo(w);
+       }
+       
+       // This is distinct from the ClientGetMessage code, as later on it will 
be radically
+       // different (it can store detailed state).
+       public SimpleFieldSet getFieldSet() {
+               SimpleFieldSet fs = new SimpleFieldSet(true); // we will need 
multi-level later...
+               fs.put("Type", "GET");
+               fs.put("URI", uri.toString(false));
+               fs.put("Identifier", identifier);
+               fs.put("Verbosity", Integer.toString(verbosity));
+               fs.put("PriorityClass", Short.toString(priorityClass));
+               fs.put("ReturnType", 
ClientGetMessage.returnTypeString(returnType));
+               fs.put("Persistence", 
ClientRequest.persistenceTypeString(persistenceType));
+               fs.put("ClientName", client.name);
+               if(targetFile != null)
+                       fs.put("Filename", targetFile.getPath());
+               if(tempFile != null)
+                       fs.put("TempFilename", tempFile.getPath());
+               if(clientToken != null)
+                       fs.put("ClientToken", clientToken);
+               if(returnType == ClientGetMessage.RETURN_TYPE_DISK && 
targetFile != null) {
+                       // Otherwise we must re-run it anyway as we don't have 
the data.
+                       
+                       // finished => persistence of completion state, pending 
messages
+                       //fs.put("Finished", Boolean.toString(finished));
+               }
+               fs.put("IgnoreDS", Boolean.toString(fctx.ignoreStore));
+               fs.put("DSOnly", Boolean.toString(fctx.localRequestOnly));
+               fs.put("MaxRetries", 
Integer.toString(fctx.maxNonSplitfileRetries));
+               return fs;
+       }
+
+       public boolean hasFinished() {
+               return finished;
+       }
+
+       public boolean isPersistentForever() {
+               return persistenceType == ClientRequest.PERSIST_FOREVER;
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java    2006-02-27 
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java    2006-03-02 
16:13:21 UTC (rev 8138)
@@ -81,20 +81,18 @@
                        }
                }
                String returnTypeString = fs.get("ReturnType");
-               if(returnTypeString == null || 
returnTypeString.equalsIgnoreCase("direct")) {
-                       returnType = RETURN_TYPE_DIRECT;
+               returnType = parseReturnType(returnTypeString);
+               if(returnType == RETURN_TYPE_DIRECT) {
                        diskFile = null;
                        tempFile = null;
                        // default just below fproxy
                        defaultPriority = 
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS;
-               } else if(returnTypeString.equalsIgnoreCase("none")) {
+               } else if(returnType == RETURN_TYPE_NONE) {
                        diskFile = null;
                        tempFile = null;
-                       returnType = RETURN_TYPE_NONE;
                        defaultPriority = 
RequestStarter.PREFETCH_PRIORITY_CLASS;
-               } else if(returnTypeString.equalsIgnoreCase("disk")) {
+               } else if(returnType == RETURN_TYPE_DISK) {
                        defaultPriority = 
RequestStarter.BULK_SPLITFILE_PRIORITY_CLASS;
-                       returnType = RETURN_TYPE_DISK;
                        String filename = fs.get("Filename");
                        if(filename == null)
                                throw new 
MessageInvalidException(ProtocolErrorMessage.MISSING_FIELD, "Missing Filename", 
identifier);
@@ -192,10 +190,7 @@
        }

        private String getReturnTypeString() {
-               if(returnType == RETURN_TYPE_DIRECT)
-                       return "direct";
-               else
-                       throw new IllegalStateException("Unknown return type: 
"+returnType);
+               return returnTypeString(returnType);
        }

        public String getName() {
@@ -221,4 +216,25 @@
                }
        }

+       public static short parseReturnType(String string) {
+               if(string == null)
+                       return RETURN_TYPE_DIRECT;
+               if(string.equalsIgnoreCase("direct"))
+                       return RETURN_TYPE_DIRECT;
+               if(string.equalsIgnoreCase("none"))
+                       return RETURN_TYPE_NONE;
+               if(string.equalsIgnoreCase("disk"))
+                       return RETURN_TYPE_DISK;
+               if(string.equalsIgnoreCase("chunked"))
+                       return RETURN_TYPE_CHUNKED;
+               return Short.parseShort(string);
+       }
+
+       public static short parseValidReturnType(String string) {
+               short s = parseReturnType(string);
+               if(s == RETURN_TYPE_DIRECT || s == RETURN_TYPE_NONE || s == 
RETURN_TYPE_DISK)
+                       return s;
+               throw new IllegalArgumentException("Invalid or unsupported 
return type: "+returnTypeString(s));
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java   2006-02-27 23:25:20 UTC 
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java   2006-03-02 16:13:21 UTC 
(rev 8138)
@@ -1,6 +1,9 @@
 package freenet.node.fcp;

+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;

 import freenet.client.ClientMetadata;
 import freenet.client.FetchException;
@@ -19,7 +22,10 @@
 import freenet.client.events.SplitfileProgressEvent;
 import freenet.client.events.StartedCompressionEvent;
 import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
 import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+import freenet.support.io.FileBucket;

 public class ClientPut extends ClientRequest implements ClientCallback, 
ClientEventListener {

@@ -68,9 +74,8 @@
                else
                        this.origHandler = null;
                client = handler.getClient();
-               ctx = new InserterContext(handler.defaultInsertContext, new 
SimpleEventProducer());
-               if(message.dontCompress)
-                       ctx.dontCompress = true;
+               ctx = new InserterContext(client.defaultInsertContext, new 
SimpleEventProducer());
+               ctx.dontCompress = message.dontCompress;
                ctx.eventProducer.addEventListener(this);
                ctx.maxInsertRetries = message.maxRetries;
                // Now go through the fields one at a time
@@ -78,8 +83,42 @@
                String mimeType = message.contentType;
                clientToken = message.clientToken;
                block = new InsertBlock(message.bucket, new 
ClientMetadata(mimeType), uri);
-               inserter = new ClientPutter(this, message.bucket, uri, new 
ClientMetadata(mimeType), ctx, handler.node.putScheduler, priorityClass, 
getCHKOnly, false, handler);
+               inserter = new ClientPutter(this, message.bucket, uri, new 
ClientMetadata(mimeType), ctx, client.node.putScheduler, priorityClass, 
getCHKOnly, false, client);
        }
+       
+       /**
+        * Create from a persisted SimpleFieldSet.
+        * Not very tolerant of errors, as the input was generated
+        * by the node.
+        * @throws MalformedURLException 
+        */
+       public ClientPut(SimpleFieldSet fs, FCPClient client2) throws 
MalformedURLException {
+               uri = new FreenetURI(fs.get("URI"));
+               identifier = fs.get("Identifier");
+               verbosity = Integer.parseInt(fs.get("Verbosity"));
+               priorityClass = Short.parseShort(fs.get("PriorityClass"));
+               persistenceType = 
ClientRequest.parsePersistence(fs.get("Persistence"));
+               if(persistenceType == ClientRequest.PERSIST_CONNECTION
+                               || persistenceType == 
ClientRequest.PERSIST_REBOOT)
+                       throw new IllegalArgumentException("Reading in 
persistent ClientPut, but persistence type = 
"+ClientRequest.persistenceTypeString(persistenceType)+" so shouldn't have been 
saved in the first place");
+               this.client = client2;
+               origHandler = null;
+               String mimeType = fs.get("Metadata.ContentType");
+               getCHKOnly = Boolean.parseBoolean(fs.get("GetCHKOnly"));
+               boolean dontCompress = 
Boolean.parseBoolean(fs.get("DontCompress"));
+               int maxRetries = Integer.parseInt(fs.get("MaxRetries"));
+               clientToken = fs.get("ClientToken");
+               fromDisk = true;
+               origFilename = new File(fs.get("Filename"));
+               Bucket data = new FileBucket(origFilename, true, false, false);
+               ctx = new InserterContext(client.defaultInsertContext, new 
SimpleEventProducer());
+               ctx.dontCompress = dontCompress;
+               ctx.eventProducer.addEventListener(this);
+               ctx.maxInsertRetries = maxRetries;
+               block = new InsertBlock(data, new ClientMetadata(mimeType), 
uri);
+               inserter = new ClientPutter(this, data, uri, new 
ClientMetadata(mimeType), ctx, client.node.putScheduler, priorityClass, 
getCHKOnly, false, client);
+               start();
+       }

        void start() {
                try {
@@ -214,4 +253,42 @@
                return identifier;
        }

+       public void write(BufferedWriter w) throws IOException {
+               if(persistenceType != ClientRequest.PERSIST_REBOOT) {
+                       Logger.error(this, "Not persisting as 
persistenceType="+persistenceType);
+               }
+               // Persist the request to disk
+               SimpleFieldSet fs = getFieldSet();
+               fs.writeTo(w);
+       }
+       
+       public SimpleFieldSet getFieldSet() throws IOException {
+               SimpleFieldSet fs = new SimpleFieldSet(true); // we will need 
multi-level later...
+               fs.put("Type", "PUT");
+               fs.put("URI", uri.toString(false));
+               fs.put("Identifier", identifier);
+               fs.put("Verbosity", Integer.toString(verbosity));
+               fs.put("PriorityClass", Short.toString(priorityClass));
+               fs.put("Persistence", 
ClientRequest.persistenceTypeString(persistenceType));
+               fs.put("ClientName", client.name);
+               fs.put("Metadata.ContentType", 
block.clientMetadata.getMIMEType());
+               fs.put("GetCHKOnly", Boolean.toString(getCHKOnly));
+               // finished => persistence of completion state, pending messages
+               //fs.put("Finished", Boolean.toString(finished));
+               fs.put("ClientToken", clientToken);
+               if(!fromDisk) throw new 
UnsupportedOperationException("Persistent insert not from disk - NOT 
SUPPORTED");
+               fs.put("Filename", origFilename.getPath());
+               fs.put("DontCompress", Boolean.toString(ctx.dontCompress));
+               fs.put("MaxRetries", Integer.toString(ctx.maxInsertRetries));
+               return fs;
+       }
+
+       public boolean hasFinished() {
+               return finished;
+       }
+
+       public boolean isPersistentForever() {
+               return persistenceType == ClientRequest.PERSIST_FOREVER;
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientRequest.java       2006-02-27 
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientRequest.java       2006-03-02 
16:13:21 UTC (rev 8138)
@@ -1,5 +1,12 @@
 package freenet.node.fcp;

+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
 /**
  * A request process carried out by the node for an FCP client.
  * Examples: ClientGet, ClientPut, MultiGet.
@@ -40,4 +47,43 @@
                }
        }

+       public static short parsePersistence(String string) {
+               if(string == null || string.equalsIgnoreCase("connection"))
+                       return PERSIST_CONNECTION;
+               if(string.equalsIgnoreCase("reboot"))
+                       return PERSIST_REBOOT;
+               if(string.equalsIgnoreCase("forever"))
+                       return PERSIST_FOREVER;
+               return Short.parseShort(string);
+       }
+
+       /**
+        * Write a persistent request to disk.
+        * @throws IOException 
+        */
+       public abstract void write(BufferedWriter w) throws IOException;
+
+       public static ClientRequest readAndRegister(BufferedReader br, 
FCPServer server) throws IOException {
+               SimpleFieldSet fs = new SimpleFieldSet(br, true);
+               String clientName = fs.get("ClientName");
+               FCPClient client = server.registerClient(clientName, 
server.node, null);
+               String type = fs.get("Type");
+               if(type.equals("GET")) {
+                       ClientGet cg = new ClientGet(fs, client);
+                       client.register(cg);
+                       return cg;
+               } else if(type.equals("PUT")) {
+                       ClientPut cp = new ClientPut(fs, client);
+                       client.register(cp);
+                       return cp;
+               } else {
+                       Logger.error(ClientRequest.class, "Unrecognized type: 
"+type);
+                       return null;
+               }
+       }
+
+       public abstract boolean hasFinished();
+       
+       public abstract boolean isPersistentForever();
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/FCPClient.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPClient.java   2006-02-27 23:25:20 UTC 
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPClient.java   2006-03-02 16:13:21 UTC 
(rev 8138)
@@ -2,7 +2,13 @@

 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Vector;

+import freenet.client.FetcherContext;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.InserterContext;
+import freenet.node.Node;
 import freenet.support.LRUQueue;

 /**
@@ -14,16 +20,23 @@
        /** Maximum number of unacknowledged completed requests */
        private static final int MAX_UNACKED_REQUESTS = 256;

-       public FCPClient(String name2, FCPConnectionHandler handler) {
+       public FCPClient(String name2, FCPServer server, FCPConnectionHandler 
handler) {
                this.name = name2;
                this.currentConnection = handler;
                this.runningPersistentRequests = new HashSet();
                this.completedUnackedRequests = new LRUQueue();
                this.clientRequestsByIdentifier = new HashMap();
+               this.server = server;
+               this.node = server.node;
+               this.client = node.makeClient((short)0);
+               defaultFetchContext = client.getFetcherContext();
+               defaultInsertContext = client.getInserterContext();
        }

        /** The client's Name sent in the ClientHello message */
        final String name;
+       /** The FCPServer */
+       final FCPServer server;
        /** The current connection handler, if any. */
        private FCPConnectionHandler currentConnection;
        /** Currently running persistent requests */
@@ -32,6 +45,11 @@
        private final LRUQueue completedUnackedRequests;
        /** ClientRequest's by identifier */
        private final HashMap clientRequestsByIdentifier;
+       /** Client (one FCPClient = one HighLevelSimpleClient = one round-robin 
slot) */
+       private final HighLevelSimpleClient client;
+       public final FetcherContext defaultFetchContext;
+       public final InserterContext defaultInsertContext;
+       public final Node node;

        public synchronized FCPConnectionHandler getConnection() {
                return currentConnection;
@@ -92,14 +110,18 @@

        public void register(ClientRequest cg) {
                synchronized(this) {
-                       runningPersistentRequests.add(cg);
+                       if(cg.hasFinished())
+                               completedUnackedRequests.push(cg);
+                       else
+                               runningPersistentRequests.add(cg);
                        clientRequestsByIdentifier.put(cg.getIdentifier(), cg);
                }
        }

        public void removeByIdentifier(String identifier) throws 
MessageInvalidException {
+               ClientRequest req;
                synchronized(this) {
-                       ClientRequest req = (ClientRequest) 
clientRequestsByIdentifier.get(identifier);
+                       req = (ClientRequest) 
clientRequestsByIdentifier.get(identifier);
                        if(req == null)
                                throw new 
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, null, 
identifier);
                        if(runningPersistentRequests.remove(req) || 
completedUnackedRequests.remove(req))
@@ -107,5 +129,23 @@
                        throw new 
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, null, 
identifier);
                }
        }
+
+       public boolean hasPersistentRequests() {
+               return !(runningPersistentRequests.isEmpty() && 
completedUnackedRequests.isEmpty());
+       }
+
+       public void addPersistentRequests(Vector v) {
+               synchronized(this) {
+                       Iterator i = runningPersistentRequests.iterator();
+                       while(i.hasNext()) {
+                               ClientRequest req = (ClientRequest) i.next();
+                               if(req.isPersistentForever())
+                                       v.add(req);
+                       }
+                       Object[] unacked = completedUnackedRequests.toArray();
+                       for(int j=0;j<unacked.length;j++)
+                               v.add(unacked[j]);
+               }
+       }

 }

Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java        
2006-02-27 23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java        
2006-03-02 16:13:21 UTC (rev 8138)
@@ -17,7 +17,6 @@
        final Socket sock;
        final FCPConnectionInputHandler inputHandler;
        final FCPConnectionOutputHandler outputHandler;
-       final Node node;
        private boolean isClosed;
        private boolean inputClosed;
        private boolean outputClosed;
@@ -25,19 +24,13 @@
        private FCPClient client;
        final BucketFactory bf;
        final HashMap requestsByIdentifier;
-       final FetcherContext defaultFetchContext;
-       public InserterContext defaultInsertContext;

-       public FCPConnectionHandler(Socket s, Node node, FCPServer server) {
+       public FCPConnectionHandler(Socket s, FCPServer server) {
                this.sock = s;
-               this.node = node;
                this.server = server;
                isClosed = false;
-               this.bf = node.tempBucketFactory;
+               this.bf = server.node.tempBucketFactory;
                requestsByIdentifier = new HashMap();
-               HighLevelSimpleClient client = node.makeClient((short)0);
-               defaultFetchContext = client.getFetcherContext();
-               defaultInsertContext = client.getInserterContext();
                this.inputHandler = new FCPConnectionInputHandler(this);
                this.outputHandler = new FCPConnectionOutputHandler(this);
                inputHandler.start();
@@ -54,6 +47,8 @@
                }
                for(int i=0;i<requests.length;i++)
                        requests[i].onLostConnection();
+               if(client != null && !client.hasPersistentRequests())
+                       server.unregisterClient(client);
        }

        public boolean isClosed() {
@@ -96,7 +91,7 @@

        public void setClientName(String name) {
                this.clientName = name;
-               client = server.registerClient(name, this);
+               client = server.registerClient(name, server.node, this);
        }

        public String getClientName() {
@@ -122,8 +117,11 @@
                        return;
                } else {
                        cg.start();
-                       if(cg.isPersistent())
+                       if(cg.isPersistent()) {
                                client.register(cg);
+                               if(cg.isPersistentForever())
+                                       server.forceStorePersistentRequests();
+                       }
                }
        }

@@ -146,8 +144,11 @@
                        return;
                } else {
                        cp.start();
-                       if(cp.isPersistent())
+                       if(cp.isPersistent()) {
                                client.register(cp);
+                               if(cp.isPersistentForever())
+                                       server.forceStorePersistentRequests();
+                       }
                }
        }


Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java   
2006-02-27 23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java   
2006-03-02 16:13:21 UTC (rev 8138)
@@ -74,7 +74,7 @@
                                continue;
                        }
                        try {
-                               msg.run(handler, handler.node);
+                               msg.run(handler, handler.server.node);
                        } catch (MessageInvalidException e) {
                                FCPMessage err = new 
ProtocolErrorMessage(e.protocolCode, false, e.getMessage(), e.ident);
                                handler.outputHandler.queue(err);

Modified: trunk/freenet/src/freenet/node/fcp/FCPServer.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPServer.java   2006-02-27 23:25:20 UTC 
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPServer.java   2006-03-02 16:13:21 UTC 
(rev 8138)
@@ -1,15 +1,31 @@
 package freenet.node.fcp;

+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.Iterator;
+import java.util.Vector;
 import java.util.WeakHashMap;

+import freenet.client.FetcherContext;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.InserterContext;
 import freenet.config.BooleanCallback;
 import freenet.config.Config;
 import freenet.config.IntCallback;
 import freenet.config.InvalidConfigValueException;
+import freenet.config.LongCallback;
 import freenet.config.StringCallback;
 import freenet.config.SubConfig;
 import freenet.node.Node;
@@ -26,29 +42,52 @@
        final boolean enabled;
        final String bindto;
        final WeakHashMap clientsByName;
+       private boolean enablePersistentDownloads;
+       private File persistentDownloadsFile;
+       private File persistentDownloadsTempFile;
+       /** Lock for persistence operations.
+        * MUST ALWAYS BE THE OUTERMOST LOCK.
+        */
+       private final Object persistenceSync = new Object();
+       private FCPServerPersister persister;
+       private boolean haveLoadedPersistentRequests;
+       private long persistenceInterval;
+       final FetcherContext defaultFetchContext;
+       public InserterContext defaultInsertContext;

-       public FCPServer(int port, Node node) throws IOException {
-               this.port = port;
-               this.enabled = true;
-               this.bindto = new String("127.0.0.1");
-               this.sock = new ServerSocket(port, 0, 
InetAddress.getByName(bindto));
-               this.node = node;
-               clientsByName = new WeakHashMap();
-               Thread t = new Thread(this, "FCP server");
+       private void startPersister() {
+               Thread t = new Thread(persister = new FCPServerPersister(), 
"FCP request persistence handler");
                t.setDaemon(true);
                t.start();
        }
-       
-       public FCPServer(String ipToBindTo, int port, Node node) throws 
IOException {
+
+       private void killPersister() {
+               persister.kill();
+               persister = null;
+       }
+
+       public FCPServer(String ipToBindTo, int port, Node node, boolean 
persistentDownloadsEnabled, String persistentDownloadsDir, long 
persistenceInterval) throws IOException, InvalidConfigValueException {
                this.bindto = new String(ipToBindTo);
+               this.persistenceInterval = persistenceInterval;
                this.port = port;
                this.enabled = true;
                this.sock = new ServerSocket(port, 0, 
InetAddress.getByName(bindto));
                this.node = node;
                clientsByName = new WeakHashMap();
+               // This one is only used to get the default settings. 
Individual FCP conns
+               // will make their own.
+               HighLevelSimpleClient client = node.makeClient((short)0);
+               defaultFetchContext = client.getFetcherContext();
+               defaultInsertContext = client.getInserterContext();
                Thread t = new Thread(this, "FCP server");
+               this.enablePersistentDownloads = persistentDownloadsEnabled;
+               setPersistentDownloadsFile(new File(persistentDownloadsDir));
                t.setDaemon(true);
                t.start();
+               if(enablePersistentDownloads) {
+                       loadPersistentRequests();
+                       startPersister();
+               }
        }

        public void run() {
@@ -66,7 +105,7 @@
        private void realRun() throws IOException {
                // Accept a connection
                Socket s = sock.accept();
-               FCPConnectionHandler handler = new FCPConnectionHandler(s, 
node, this);
+               FCPConnectionHandler handler = new FCPConnectionHandler(s, 
this);
        }

        static class FCPPortNumberCallback implements IntCallback {
@@ -107,6 +146,9 @@
                }
        }

+       // FIXME: Consider moving everything except enabled into constructor
+       // Actually we could move enabled in too with an exception???
+       
        static class FCPBindtoCallback implements StringCallback{

                final Node node;
@@ -127,34 +169,153 @@
                }
        }

+       static class PersistentDownloadsEnabledCallback implements 
BooleanCallback {
+               
+               FCPServer server;
+               
+               public boolean get() {
+                       return server.persistentDownloadsEnabled();
+               }
+               
+               public void set(boolean set) {
+                       if(server.persistentDownloadsEnabled() != set)
+                               server.setPersistentDownloadsEnabled(set);
+               }
+               
+       }
+
+       static class PersistentDownloadsFileCallback implements StringCallback {
+               
+               FCPServer server;
+               
+               public String get() {
+                       return server.persistentDownloadsFile.toString();
+               }
+               
+               public void set(String val) throws InvalidConfigValueException {
+                       File f = new File(val);
+                       if(f.equals(server.persistentDownloadsFile)) return;
+                       server.setPersistentDownloadsFile(f);
+               }
+       }
+
+       static class PersistentDownloadsIntervalCallback implements 
LongCallback {
+               
+               FCPServer server;
+               
+               public long get() {
+                       return server.persistenceInterval;
+               }
+               
+               public void set(long value) {
+                       server.persistenceInterval = value;
+                       FCPServerPersister p = server.persister;
+                       if(p != null) {
+                               synchronized(p) {
+                                       p.notify();
+                               }
+                       }
+               }
+       }

-       public static FCPServer maybeCreate(Node node, Config config) throws 
IOException {
+       public static FCPServer maybeCreate(Node node, Config config) throws 
IOException, InvalidConfigValueException {
                SubConfig fcpConfig = new SubConfig("fcp", config);
                fcpConfig.register("enabled", true, 2, true, "Is FCP server 
enabled ?", "Is FCP server enabled ?", new FCPEnabledCallback(node));
                fcpConfig.register("port", 9481 /* anagram of 1984, and 1000 up 
from old number */,
                                2, true, "FCP port number", "FCP port number", 
new FCPPortNumberCallback(node));
                fcpConfig.register("bindto", "127.0.0.1", 2, true, "Ip address 
to bind to", "Ip address to bind the FCP server to", new 
FCPBindtoCallback(node));
+               PersistentDownloadsEnabledCallback cb1;
+               PersistentDownloadsFileCallback cb2;
+               PersistentDownloadsIntervalCallback cb3;
+               fcpConfig.register("persistentDownloadsEnabled", true, 3, true, 
"Enable persistent downloads?", "Whether to enable Persistence=forever for FCP 
requests. Meaning whether to support requests which persist over node restarts; 
they must be written to disk and this may constitute a security risk for some 
people.",
+                               cb1 = new PersistentDownloadsEnabledCallback());
+               boolean persistentDownloadsEnabled = 
fcpConfig.getBoolean("persistentDownloadsEnabled");
+               fcpConfig.register("persistentDownloadsFile", "downloads.dat", 
4, true, "Filename to store persistent downloads in", "Filename to store 
details of persistent downloads to",
+                               cb2 = new PersistentDownloadsFileCallback());
+               String persistentDownloadsDir = 
+                       fcpConfig.getString("persistentDownloadsFile");

+               fcpConfig.register("persistentDownloadsInterval", 
(long)(5*60*1000), 5, true, "Interval between writing persistent downloads to 
disk", "Interval between writing persistent downloads to disk",
+                               cb3 = new 
PersistentDownloadsIntervalCallback());
+               
+               long persistentDownloadsInterval = 
fcpConfig.getLong("persistentDownloadsInterval");
+               
                FCPServer fcp;
                if(fcpConfig.getBoolean("enabled")){
                        Logger.normal(node, "Starting FCP server on 
"+fcpConfig.getString("bindto")+":"+fcpConfig.getInt("port")+".");
-                       fcp = new FCPServer(fcpConfig.getString("bindto"), 
fcpConfig.getInt("port"), node);
+                       fcp = new FCPServer(fcpConfig.getString("bindto"), 
fcpConfig.getInt("port"), node, persistentDownloadsEnabled, 
persistentDownloadsDir, persistentDownloadsInterval);
                        node.setFCPServer(fcp); 
                }else{
                        Logger.normal(node, "Not starting FCP server as it's 
disabled");
                        fcp = null;
                }
+               
+               if(fcp != null) {
+                       cb1.server = fcp;
+                       cb2.server = fcp;
+                       cb3.server = fcp;
+               }
+               
                fcpConfig.finishedInitialization();
                return fcp;
        }

-       public FCPClient registerClient(String name, FCPConnectionHandler 
handler) {
+       public void setPersistentDownloadsFile(File f) throws 
InvalidConfigValueException {
+               synchronized(persistenceSync) {
+                       checkFile(f);
+                       File temp = new File(f.getPath()+".tmp");
+                       checkFile(temp);
+                       // Else is ok
+                       persistentDownloadsFile = f;
+                       persistentDownloadsTempFile = temp;
+               }
+       }
+
+       private void checkFile(File f) throws InvalidConfigValueException {
+               if(f.isDirectory()) 
+                       throw new InvalidConfigValueException("Invalid filename 
for downloads list: is a directory");
+               if(f.isFile() && !(f.canRead() && f.canWrite()))
+                       throw new InvalidConfigValueException("File exists but 
cannot be read");
+               File parent = f.getParentFile();
+               if(parent != null && !parent.exists())
+                       throw new InvalidConfigValueException("Parent directory 
does not exist");
+               if(!f.exists()) {
+                       try {
+                               if(!((f.createNewFile() || f.exists()) && 
(f.canRead() && f.canWrite())))
+                                       throw new 
InvalidConfigValueException("File does not exist, cannot create it and/or 
cannot read/write it");
+                       } catch (IOException e) {
+                               throw new InvalidConfigValueException("File 
does not exist and cannot be created");
+                       }
+               }
+       }
+
+       public void setPersistentDownloadsEnabled(boolean set) {
+               synchronized(persistenceSync) {
+                       if(enablePersistentDownloads == set) return;
+                       if(set) {
+                               if(!haveLoadedPersistentRequests)
+                                       loadPersistentRequests();
+                               startPersister();
+                       } else {
+                               killPersister();
+                       }
+                       enablePersistentDownloads = set;
+               }
+       }
+
+       public boolean persistentDownloadsEnabled() {
+               synchronized(persistenceSync) {
+                       return enablePersistentDownloads;
+               }
+       }
+
+       public FCPClient registerClient(String name, Node node, 
FCPConnectionHandler handler) {
                FCPClient oldClient;
                synchronized(this) {
                        oldClient = (FCPClient) clientsByName.get(name);
                        if(oldClient == null) {
                                // Create new client
-                               FCPClient client = new FCPClient(name, handler);
+                               FCPClient client = new FCPClient(name, this, 
handler);
                                clientsByName.put(name, client);
                                return client;
                        } else {
@@ -172,8 +333,150 @@
                                }
                        }
                }
-               
oldClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
+               if(handler != null)
+                       
oldClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
                return oldClient;
        }

+       public void unregisterClient(FCPClient client) {
+               synchronized(this) {
+                       String name = client.name;
+                       clientsByName.remove(name);
+               }
+       }
+
+       class FCPServerPersister implements Runnable {
+
+               private boolean killed;
+               private boolean storeNow;
+               
+               public void force() {
+                       synchronized(this) {
+                               storeNow = true;
+                               notifyAll();
+                       }
+               }
+               
+               void kill() {
+                       synchronized(this) {
+                               killed = true;
+                               notifyAll();
+                       }
+               }
+               
+               public void run() {
+                       while(true) {
+                               synchronized(this) {
+                                       if(killed) return;
+                                       long startTime = 
System.currentTimeMillis();
+                                       long now;
+                                       while((now = 
System.currentTimeMillis()) < startTime + persistenceInterval && !storeNow) {
+                                               try {
+                                                       long wait = 
Math.max((startTime + persistenceInterval) - now, Integer.MAX_VALUE);
+                                                       if(wait > 0)
+                                                               
wait(Math.min(wait, 5000));
+                                               } catch (InterruptedException 
e) {
+                                                       // Ignore
+                                               }
+                                               if(killed) return;
+                                       }
+                               }
+                               storeNow = false;
+                               storePersistentRequests();
+                       }
+               }
+               
+       }
+
+       public void forceStorePersistentRequests() {
+               Logger.minor(this, "Forcing store persistent requests");
+               if(persister != null) {
+                       persister.force();
+               } else {
+                       Logger.error(this, "Persister not running, cannot store 
persistent requests");
+               }
+       }
+       
+       /** Store all persistent requests to disk */
+       public void storePersistentRequests() {
+               Logger.minor(this, "Storing persistent requests");
+               ClientRequest[] persistentRequests = getPersistentRequests();
+               synchronized(persistenceSync) {
+                       try {
+                               FileOutputStream fos = new 
FileOutputStream(persistentDownloadsTempFile);
+                               BufferedOutputStream bos = new 
BufferedOutputStream(fos);
+                               OutputStreamWriter osw = new 
OutputStreamWriter(bos);
+                               BufferedWriter w = new BufferedWriter(osw);
+                               
w.write(Integer.toString(persistentRequests.length)+"\n");
+                               for(int i=0;i<persistentRequests.length;i++)
+                                       persistentRequests[i].write(w);
+                               w.close();
+                               
if(!persistentDownloadsTempFile.renameTo(persistentDownloadsFile)) {
+                                       Logger.minor(this, "Rename failed");
+                                       persistentDownloadsFile.delete();
+                                       
if(!persistentDownloadsTempFile.renameTo(persistentDownloadsFile)) {
+                                               Logger.error(this, "Could not 
rename persisted requests temp file "+persistentDownloadsTempFile+" to 
"+persistentDownloadsFile);
+                                       }
+                               }
+                       } catch (IOException e) {
+                               Logger.error(this, "Cannot write persistent 
requests to disk: "+e);
+                       }
+               }
+       }
+
+       private void loadPersistentRequests() {
+               synchronized(persistenceSync) {
+                       FileInputStream fis;
+                       try {
+                               fis = new 
FileInputStream(persistentDownloadsFile);
+                       } catch (FileNotFoundException e) {
+                               Logger.normal(this, "Not reading any persistent 
requests from disk because no file exists");
+                               return;
+                       }
+                       BufferedInputStream bis = new BufferedInputStream(fis);
+                       InputStreamReader ris = new InputStreamReader(bis);
+                       BufferedReader br = new BufferedReader(ris);
+                       try {
+                               String r = br.readLine();
+                               int count;
+                               try {
+                                       count = Integer.parseInt(r);
+                               } catch (NumberFormatException e) {
+                                       Logger.error(this, "Corrupt persistent 
downloads file: "+persistentDownloadsFile);
+                                       try {
+                                               br.close();
+                                       } catch (IOException e1) {
+                                               Logger.error(this, "Error 
closing: "+e1, e1);
+                                       }
+                                       return;
+                               }
+                               for(int i=0;i<count;i++) {
+                                       ClientRequest req = 
ClientRequest.readAndRegister(br, this);
+                               }
+                               br.close();
+                       } catch (IOException e) {
+                               Logger.error(this, "Error reading persistent 
downloads file: "+persistentDownloadsFile+" : "+e, e);
+                               try {
+                                       br.close();
+                               } catch (IOException e1) {
+                                       Logger.error(this, "Error closing: 
"+e1, e1);
+                               }
+                               return;
+                       }
+                       return;
+               }
+       }
+
+       private ClientRequest[] getPersistentRequests() {
+               Vector v = new Vector();
+               synchronized(this) {
+                       Iterator i = clientsByName.values().iterator();
+                       while(i.hasNext()) {
+                               FCPClient client = (FCPClient) (i.next());
+                               client.addPersistentRequests(v);
+                       }
+               }
+               return (ClientRequest[]) v.toArray(new ClientRequest[v.size()]);
+       }
+
 }

Modified: trunk/freenet/src/freenet/support/LRUQueue.java
===================================================================
--- trunk/freenet/src/freenet/support/LRUQueue.java     2006-02-27 23:25:20 UTC 
(rev 8137)
+++ trunk/freenet/src/freenet/support/LRUQueue.java     2006-03-02 16:13:21 UTC 
(rev 8138)
@@ -94,5 +94,9 @@
        public synchronized Object[] toArray() {
                return hash.keySet().toArray();
        }
+
+       public synchronized boolean isEmpty() {
+               return hash.isEmpty();
+       }
 }


Modified: trunk/freenet/src/freenet/support/SimpleFieldSet.java
===================================================================
--- trunk/freenet/src/freenet/support/SimpleFieldSet.java       2006-02-27 
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/support/SimpleFieldSet.java       2006-03-02 
16:13:21 UTC (rev 8138)
@@ -180,6 +180,7 @@

        public void put(String key, String value) {
                int idx;
+               if(value == null) return;
                if((!multiLevel) || (idx = key.indexOf(MULTI_LEVEL_CHAR)) == 
-1) {
                        String x = (String) map.get(key);

@@ -220,6 +221,7 @@
                 w.write(prefix+key+"="+value+"\n");
             } else {
                SimpleFieldSet sfs = (SimpleFieldSet) v;
+               if(sfs == null) throw new NullPointerException();
                sfs.writeTo(w, prefix+key+MULTI_LEVEL_CHAR, true);
             }
         }


Reply via email to